mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 10:21:14 -05:00
@@ -34,8 +34,8 @@ from app.dao.services_dao import (
|
|||||||
)
|
)
|
||||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||||
from app.delivery.send_to_providers import provider_to_use
|
from app.delivery.send_to_providers import provider_to_use
|
||||||
from app.enums import NotificationType
|
from app.enums import NotificationType, JobStatus
|
||||||
from app.models import JOB_STATUS_ERROR, JOB_STATUS_IN_PROGRESS, JOB_STATUS_PENDING, Job
|
from app.models import Job
|
||||||
from app.notifications.process_notifications import send_notification_to_queue
|
from app.notifications.process_notifications import send_notification_to_queue
|
||||||
|
|
||||||
MAX_NOTIFICATION_FAILS = 10000
|
MAX_NOTIFICATION_FAILS = 10000
|
||||||
@@ -186,11 +186,11 @@ def check_job_status():
|
|||||||
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
|
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
|
||||||
|
|
||||||
incomplete_in_progress_jobs = Job.query.filter(
|
incomplete_in_progress_jobs = Job.query.filter(
|
||||||
Job.job_status == JOB_STATUS_IN_PROGRESS,
|
Job.job_status == JobStatus.IN_PROGRESS,
|
||||||
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
|
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
|
||||||
)
|
)
|
||||||
incomplete_pending_jobs = Job.query.filter(
|
incomplete_pending_jobs = Job.query.filter(
|
||||||
Job.job_status == JOB_STATUS_PENDING,
|
Job.job_status == JobStatus.PENDING,
|
||||||
Job.scheduled_for.isnot(None),
|
Job.scheduled_for.isnot(None),
|
||||||
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
|
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
|
||||||
)
|
)
|
||||||
@@ -205,7 +205,7 @@ def check_job_status():
|
|||||||
# if they haven't been re-processed in time.
|
# if they haven't been re-processed in time.
|
||||||
job_ids = []
|
job_ids = []
|
||||||
for job in jobs_not_complete_after_30_minutes:
|
for job in jobs_not_complete_after_30_minutes:
|
||||||
job.job_status = JOB_STATUS_ERROR
|
job.job_status = JobStatus.ERROR
|
||||||
dao_update_job(job)
|
dao_update_job(job)
|
||||||
job_ids.append(str(job.id))
|
job_ids.append(str(job.id))
|
||||||
|
|
||||||
|
|||||||
@@ -20,14 +20,8 @@ from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
|
|||||||
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
|
||||||
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
|
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
|
||||||
from app.dao.templates_dao import dao_get_template_by_id
|
from app.dao.templates_dao import dao_get_template_by_id
|
||||||
from app.enums import NotificationType
|
from app.enums import NotificationType, JobStatus
|
||||||
from app.models import (
|
from app.models import KEY_TYPE_NORMAL
|
||||||
JOB_STATUS_CANCELLED,
|
|
||||||
JOB_STATUS_FINISHED,
|
|
||||||
JOB_STATUS_IN_PROGRESS,
|
|
||||||
JOB_STATUS_PENDING,
|
|
||||||
KEY_TYPE_NORMAL,
|
|
||||||
)
|
|
||||||
from app.notifications.process_notifications import persist_notification
|
from app.notifications.process_notifications import persist_notification
|
||||||
from app.notifications.validators import check_service_over_total_message_limit
|
from app.notifications.validators import check_service_over_total_message_limit
|
||||||
from app.serialised_models import SerialisedService, SerialisedTemplate
|
from app.serialised_models import SerialisedService, SerialisedTemplate
|
||||||
@@ -46,17 +40,17 @@ def process_job(job_id, sender_id=None):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if job.job_status != JOB_STATUS_PENDING:
|
if job.job_status != JobStatus.PENDING:
|
||||||
return
|
return
|
||||||
|
|
||||||
service = job.service
|
service = job.service
|
||||||
|
|
||||||
job.job_status = JOB_STATUS_IN_PROGRESS
|
job.job_status = JobStatus.IN_PROGRESS
|
||||||
job.processing_started = start
|
job.processing_started = start
|
||||||
dao_update_job(job)
|
dao_update_job(job)
|
||||||
|
|
||||||
if not service.active:
|
if not service.active:
|
||||||
job.job_status = JOB_STATUS_CANCELLED
|
job.job_status = JobStatus.CANCELLED
|
||||||
dao_update_job(job)
|
dao_update_job(job)
|
||||||
current_app.logger.warning(
|
current_app.logger.warning(
|
||||||
"Job {} has been cancelled, service {} is inactive".format(
|
"Job {} has been cancelled, service {} is inactive".format(
|
||||||
@@ -85,7 +79,7 @@ def process_job(job_id, sender_id=None):
|
|||||||
|
|
||||||
|
|
||||||
def job_complete(job, resumed=False, start=None):
|
def job_complete(job, resumed=False, start=None):
|
||||||
job.job_status = JOB_STATUS_FINISHED
|
job.job_status = JobStatus.FINISHED
|
||||||
|
|
||||||
finished = datetime.utcnow()
|
finished = datetime.utcnow()
|
||||||
job.processing_finished = finished
|
job.processing_finished = finished
|
||||||
@@ -432,7 +426,7 @@ def process_incomplete_jobs(job_ids):
|
|||||||
|
|
||||||
# reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again
|
# reset the processing start time so that the check_job_status scheduled task doesn't pick this job up again
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
job.job_status = JOB_STATUS_IN_PROGRESS
|
job.job_status = JobStatus.IN_PROGRESS
|
||||||
job.processing_started = datetime.utcnow()
|
job.processing_started = datetime.utcnow()
|
||||||
dao_update_job(job)
|
dao_update_job(job)
|
||||||
|
|
||||||
|
|||||||
@@ -5,10 +5,8 @@ from flask import current_app
|
|||||||
from sqlalchemy import and_, asc, desc, func
|
from sqlalchemy import and_, asc, desc, func
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
|
from app.enums import JobStatus
|
||||||
from app.models import (
|
from app.models import (
|
||||||
JOB_STATUS_FINISHED,
|
|
||||||
JOB_STATUS_PENDING,
|
|
||||||
JOB_STATUS_SCHEDULED,
|
|
||||||
FactNotificationStatus,
|
FactNotificationStatus,
|
||||||
Job,
|
Job,
|
||||||
Notification,
|
Notification,
|
||||||
@@ -85,7 +83,7 @@ def dao_get_scheduled_job_stats(
|
|||||||
)
|
)
|
||||||
.filter(
|
.filter(
|
||||||
Job.service_id == service_id,
|
Job.service_id == service_id,
|
||||||
Job.job_status == JOB_STATUS_SCHEDULED,
|
Job.job_status == JobStatus.SCHEDULED,
|
||||||
)
|
)
|
||||||
.one()
|
.one()
|
||||||
)
|
)
|
||||||
@@ -111,7 +109,7 @@ def dao_set_scheduled_jobs_to_pending():
|
|||||||
"""
|
"""
|
||||||
jobs = (
|
jobs = (
|
||||||
Job.query.filter(
|
Job.query.filter(
|
||||||
Job.job_status == JOB_STATUS_SCHEDULED,
|
Job.job_status == JobStatus.SCHEDULED,
|
||||||
Job.scheduled_for < datetime.utcnow(),
|
Job.scheduled_for < datetime.utcnow(),
|
||||||
)
|
)
|
||||||
.order_by(asc(Job.scheduled_for))
|
.order_by(asc(Job.scheduled_for))
|
||||||
@@ -120,7 +118,7 @@ def dao_set_scheduled_jobs_to_pending():
|
|||||||
)
|
)
|
||||||
|
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
job.job_status = JOB_STATUS_PENDING
|
job.job_status = JobStatus.PENDING
|
||||||
|
|
||||||
db.session.add_all(jobs)
|
db.session.add_all(jobs)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
@@ -132,7 +130,7 @@ def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id):
|
|||||||
return Job.query.filter(
|
return Job.query.filter(
|
||||||
Job.service_id == service_id,
|
Job.service_id == service_id,
|
||||||
Job.id == job_id,
|
Job.id == job_id,
|
||||||
Job.job_status == JOB_STATUS_SCHEDULED,
|
Job.job_status == JobStatus.SCHEDULED,
|
||||||
Job.scheduled_for > datetime.utcnow(),
|
Job.scheduled_for > datetime.utcnow(),
|
||||||
).one()
|
).one()
|
||||||
|
|
||||||
@@ -200,7 +198,7 @@ def find_jobs_with_missing_rows():
|
|||||||
jobs_with_rows_missing = (
|
jobs_with_rows_missing = (
|
||||||
db.session.query(Job)
|
db.session.query(Job)
|
||||||
.filter(
|
.filter(
|
||||||
Job.job_status == JOB_STATUS_FINISHED,
|
Job.job_status == JobStatus.FINISHED,
|
||||||
Job.processing_finished < ten_minutes_ago,
|
Job.processing_finished < ten_minutes_ago,
|
||||||
Job.processing_finished > yesterday,
|
Job.processing_finished > yesterday,
|
||||||
Job.id == Notification.job_id,
|
Job.id == Notification.job_id,
|
||||||
|
|||||||
@@ -5,10 +5,8 @@ from flask import current_app
|
|||||||
from sqlalchemy import String, and_, desc, func, literal, text
|
from sqlalchemy import String, and_, desc, func, literal, text
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from app.enums import NotificationStatus, NotificationType
|
from app.enums import NotificationStatus, NotificationType, JobStatus
|
||||||
from app.models import (
|
from app.models import (
|
||||||
JOB_STATUS_CANCELLED,
|
|
||||||
JOB_STATUS_SCHEDULED,
|
|
||||||
Job,
|
Job,
|
||||||
Notification,
|
Notification,
|
||||||
ServiceDataRetention,
|
ServiceDataRetention,
|
||||||
@@ -52,7 +50,7 @@ def dao_get_uploads_by_service_id(service_id, limit_days=None, page=1, page_size
|
|||||||
Job.service_id == service_id,
|
Job.service_id == service_id,
|
||||||
Job.original_file_name != current_app.config["TEST_MESSAGE_FILENAME"],
|
Job.original_file_name != current_app.config["TEST_MESSAGE_FILENAME"],
|
||||||
Job.original_file_name != current_app.config["ONE_OFF_MESSAGE_FILENAME"],
|
Job.original_file_name != current_app.config["ONE_OFF_MESSAGE_FILENAME"],
|
||||||
Job.job_status.notin_([JOB_STATUS_CANCELLED, JOB_STATUS_SCHEDULED]),
|
Job.job_status.notin_([JobStatus.CANCELLED, JobStatus.SCHEDULED]),
|
||||||
func.coalesce(Job.processing_started, Job.created_at)
|
func.coalesce(Job.processing_started, Job.created_at)
|
||||||
>= today - func.coalesce(ServiceDataRetention.days_of_retention, 7),
|
>= today - func.coalesce(ServiceDataRetention.days_of_retention, 7),
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ class KeyType(Enum):
|
|||||||
TEST = "test"
|
TEST = "test"
|
||||||
|
|
||||||
|
|
||||||
class JobStatusType(Enum):
|
class JobStatus(Enum):
|
||||||
PENDING = "pending"
|
PENDING = "pending"
|
||||||
IN_PROGRESS = "in progress"
|
IN_PROGRESS = "in progress"
|
||||||
FINISHED = "finished"
|
FINISHED = "finished"
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ from app.dao.notifications_dao import (
|
|||||||
from app.dao.services_dao import dao_fetch_service_by_id
|
from app.dao.services_dao import dao_fetch_service_by_id
|
||||||
from app.dao.templates_dao import dao_get_template_by_id
|
from app.dao.templates_dao import dao_get_template_by_id
|
||||||
from app.errors import InvalidRequest, register_errors
|
from app.errors import InvalidRequest, register_errors
|
||||||
from app.models import JOB_STATUS_CANCELLED, JOB_STATUS_PENDING, JOB_STATUS_SCHEDULED
|
from app.enums import JobStatus
|
||||||
from app.schemas import (
|
from app.schemas import (
|
||||||
job_schema,
|
job_schema,
|
||||||
notification_with_template_schema,
|
notification_with_template_schema,
|
||||||
@@ -53,7 +53,7 @@ def get_job_by_service_and_job_id(service_id, job_id):
|
|||||||
@job_blueprint.route("/<job_id>/cancel", methods=["POST"])
|
@job_blueprint.route("/<job_id>/cancel", methods=["POST"])
|
||||||
def cancel_job(service_id, job_id):
|
def cancel_job(service_id, job_id):
|
||||||
job = dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id)
|
job = dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id)
|
||||||
job.job_status = JOB_STATUS_CANCELLED
|
job.job_status = JobStatus.CANCELLED
|
||||||
dao_update_job(job)
|
dao_update_job(job)
|
||||||
|
|
||||||
return get_job_by_service_and_job_id(service_id, job_id)
|
return get_job_by_service_and_job_id(service_id, job_id)
|
||||||
@@ -175,13 +175,13 @@ def create_job(service_id):
|
|||||||
job = job_schema.load(data)
|
job = job_schema.load(data)
|
||||||
|
|
||||||
if job.scheduled_for:
|
if job.scheduled_for:
|
||||||
job.job_status = JOB_STATUS_SCHEDULED
|
job.job_status = JobStatus.SCHEDULED
|
||||||
|
|
||||||
dao_create_job(job)
|
dao_create_job(job)
|
||||||
|
|
||||||
sender_id = data.get("sender_id")
|
sender_id = data.get("sender_id")
|
||||||
|
|
||||||
if job.job_status == JOB_STATUS_PENDING:
|
if job.job_status == JobStatus.PENDING:
|
||||||
process_job.apply_async(
|
process_job.apply_async(
|
||||||
[str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS
|
[str(job.id)], {"sender_id": sender_id}, queue=QueueNames.JOBS
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from app.enums import (
|
|||||||
CallbackType,
|
CallbackType,
|
||||||
CodeType,
|
CodeType,
|
||||||
InvitedUserStatus,
|
InvitedUserStatus,
|
||||||
|
JobStatus,
|
||||||
KeyType,
|
KeyType,
|
||||||
NotificationStatus,
|
NotificationStatus,
|
||||||
NotificationType,
|
NotificationType,
|
||||||
@@ -1346,33 +1347,6 @@ class ProviderDetailsHistory(db.Model, HistoryModel):
|
|||||||
supports_international = db.Column(db.Boolean, nullable=False, default=False)
|
supports_international = db.Column(db.Boolean, nullable=False, default=False)
|
||||||
|
|
||||||
|
|
||||||
JOB_STATUS_PENDING = "pending"
|
|
||||||
JOB_STATUS_IN_PROGRESS = "in progress"
|
|
||||||
JOB_STATUS_FINISHED = "finished"
|
|
||||||
JOB_STATUS_SENDING_LIMITS_EXCEEDED = "sending limits exceeded"
|
|
||||||
JOB_STATUS_SCHEDULED = "scheduled"
|
|
||||||
JOB_STATUS_CANCELLED = "cancelled"
|
|
||||||
JOB_STATUS_READY_TO_SEND = "ready to send"
|
|
||||||
JOB_STATUS_SENT_TO_DVLA = "sent to dvla"
|
|
||||||
JOB_STATUS_ERROR = "error"
|
|
||||||
JOB_STATUS_TYPES = [
|
|
||||||
JOB_STATUS_PENDING,
|
|
||||||
JOB_STATUS_IN_PROGRESS,
|
|
||||||
JOB_STATUS_FINISHED,
|
|
||||||
JOB_STATUS_SENDING_LIMITS_EXCEEDED,
|
|
||||||
JOB_STATUS_SCHEDULED,
|
|
||||||
JOB_STATUS_CANCELLED,
|
|
||||||
JOB_STATUS_READY_TO_SEND,
|
|
||||||
JOB_STATUS_SENT_TO_DVLA,
|
|
||||||
JOB_STATUS_ERROR,
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class JobStatus(db.Model):
|
|
||||||
__tablename__ = "job_status"
|
|
||||||
|
|
||||||
name = db.Column(db.String(255), primary_key=True)
|
|
||||||
|
|
||||||
|
|
||||||
class Job(db.Model):
|
class Job(db.Model):
|
||||||
__tablename__ = "jobs"
|
__tablename__ = "jobs"
|
||||||
@@ -1423,8 +1397,7 @@ class Job(db.Model):
|
|||||||
)
|
)
|
||||||
scheduled_for = db.Column(db.DateTime, index=True, unique=False, nullable=True)
|
scheduled_for = db.Column(db.DateTime, index=True, unique=False, nullable=True)
|
||||||
job_status = db.Column(
|
job_status = db.Column(
|
||||||
db.String(255),
|
db.Enum(JobStatus, name="job_status"),
|
||||||
db.ForeignKey("job_status.name"),
|
|
||||||
index=True,
|
index=True,
|
||||||
nullable=False,
|
nullable=False,
|
||||||
default="pending",
|
default="pending",
|
||||||
|
|||||||
Reference in New Issue
Block a user