Added a new scheduled task that runs every minute, may want to run it every 3 minutes.

The tasks checks that a job is not still in progress 30 minutes after the job started processing.
This commit is contained in:
Rebecca Law
2017-10-12 16:21:08 +01:00
parent c068c922b4
commit e08690cad2
5 changed files with 104 additions and 66 deletions

View File

@@ -4,6 +4,7 @@ from datetime import (
)
from flask import current_app
from sqlalchemy import or_, and_
from sqlalchemy.exc import SQLAlchemyError
from notifications_utils.s3 import s3upload
@@ -17,8 +18,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da
from app.dao.jobs_dao import (
dao_get_letter_job_ids_by_status,
dao_set_scheduled_jobs_to_pending,
dao_get_jobs_older_than_limited_by
)
dao_get_jobs_older_than_limited_by,
dao_get_job_by_id)
from app.dao.monthly_billing_dao import (
get_service_ids_that_need_billing_populated,
create_or_update_monthly_billing
@@ -37,12 +38,14 @@ from app.dao.provider_details_dao import (
dao_toggle_sms_provider
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND
from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \
EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS
from app.notifications.process_notifications import send_notification_to_queue
from app.statsd_decorators import statsd
from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications
from app.config import QueueNames, TaskNames
from app.utils import convert_utc_to_bst
from app.v2.errors import JobIncompleteError
@notify_celery.task(name="remove_csv_files")
@@ -354,3 +357,30 @@ def run_letter_api_notifications():
QueueNames.PROCESS_FTP
)
)
@notify_celery.task(name='check-job-status')
@statsd(namespace="tasks")
def check_job_status():
"""
every x minutes do this check
select
from jobs
where job_status == 'in progress'
and template_type in ('sms', 'email')
and scheduled_at or created_at is older that 30 minutes.
if any results then
raise error
process the rows in the csv thatgit c are missing (in another task) just do the check here.
"""
thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30)
thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35)
jobs_not_complete_after_30_minutes = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS,
and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago)
).all()
job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes]
if job_ids:
raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids))

View File

@@ -6,9 +6,15 @@ from flask import current_app
from notifications_utils.recipients import (
RecipientCSV
)
from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate
from requests import HTTPError
from requests import request
from notifications_utils.template import (
SMSMessageTemplate,
WithSubjectTemplate,
LetterDVLATemplate
)
from requests import (
HTTPError,
request
)
from sqlalchemy.exc import SQLAlchemyError
from app import (
create_uuid,
@@ -53,7 +59,6 @@ from app.models import (
from app.notifications.process_notifications import persist_notification
from app.service.utils import service_allowed_to_send_to
from app.statsd_decorators import statsd
from app.v2.errors import JobIncompleteError
from notifications_utils.s3 import s3upload
@@ -79,6 +84,7 @@ def process_job(job_id):
return
job.job_status = JOB_STATUS_IN_PROGRESS
job.processing_started = start
dao_update_job(job)
db_template = dao_get_template_by_id(job.template_id, job.template_version)
@@ -88,8 +94,6 @@ def process_job(job_id):
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS)
for row_number, recipient, personalisation in RecipientCSV(
s3.get_job_from_s3(str(service.id), str(job_id)),
template_type=template.template_type,
@@ -107,7 +111,6 @@ def process_job(job_id):
job.job_status = JOB_STATUS_FINISHED
finished = datetime.utcnow()
job.processing_started = start
job.processing_finished = finished
dao_update_job(job)
current_app.logger.info(
@@ -328,16 +331,6 @@ def update_dvla_job_to_error(self, job_id):
current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR))
@notify_celery.task(bind=True, name='check-job-status', countdown=3600)
@statsd(namespace="tasks")
def check_job_status(self, job_id):
job = dao_get_job_by_id(job_id)
if (job.template.template_type == LETTER_TYPE and job.job_status != JOB_STATUS_SENT_TO_DVLA) or\
(job.template.template_type != LETTER_TYPE and job.job_status != JOB_STATUS_FINISHED):
raise JobIncompleteError("Job {} did not complete".format(job_id))
@notify_celery.task(bind=True, name='update-letter-notifications-to-sent')
@statsd(namespace="tasks")
def update_letter_notifications_to_sent_to_dvla(self, notification_references):