diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 002220798..422eb6d4d 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -5,6 +5,7 @@ from datetime import ( from celery.signals import worker_process_shutdown from flask import current_app +from sqlalchemy import or_, and_ from sqlalchemy.exc import SQLAlchemyError from notifications_utils.s3 import s3upload @@ -18,8 +19,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da from app.dao.jobs_dao import ( dao_get_letter_job_ids_by_status, dao_set_scheduled_jobs_to_pending, - dao_get_jobs_older_than_limited_by -) + dao_get_jobs_older_than_limited_by, + dao_get_job_by_id) from app.dao.monthly_billing_dao import ( get_service_ids_that_need_billing_populated, create_or_update_monthly_billing @@ -38,12 +39,14 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago -from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND +from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \ + EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications from app.config import QueueNames, TaskNames from app.utils import convert_utc_to_bst +from app.v2.errors import JobIncompleteError @worker_process_shutdown.connect @@ -360,3 +363,30 @@ def run_letter_api_notifications(): QueueNames.PROCESS_FTP ) ) + + +@notify_celery.task(name='check-job-status') +@statsd(namespace="tasks") +def check_job_status(): + """ + every x minutes do this check + select + from jobs + where job_status == 'in progress' + and template_type in ('sms', 'email') + and scheduled_at or created_at is older that 30 minutes. + if any results then + raise error + process the rows in the csv that are missing (in another task) just do the check here. + """ + thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30) + thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35) + + jobs_not_complete_after_30_minutes = Job.query.filter( + Job.job_status == JOB_STATUS_IN_PROGRESS, + and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) + ).all() + + job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] + if job_ids: + raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index e4b00576e..45aa480dd 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,5 +1,5 @@ import json -from datetime import (datetime) +from datetime import datetime from collections import namedtuple from celery.signals import worker_process_init, worker_process_shutdown @@ -7,9 +7,15 @@ from flask import current_app from notifications_utils.recipients import ( RecipientCSV ) -from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate -from requests import HTTPError -from requests import request +from notifications_utils.template import ( + SMSMessageTemplate, + WithSubjectTemplate, + LetterDVLATemplate +) +from requests import ( + HTTPError, + request +) from sqlalchemy.exc import SQLAlchemyError from app import ( create_uuid, @@ -84,6 +90,7 @@ def process_job(job_id): return job.job_status = JOB_STATUS_IN_PROGRESS + job.processing_started = start dao_update_job(job) db_template = dao_get_template_by_id(job.template_id, job.template_version) @@ -91,6 +98,8 @@ def process_job(job_id): TemplateClass = get_template_class(db_template.template_type) template = TemplateClass(db_template.__dict__) + current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) + for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, @@ -108,7 +117,6 @@ def process_job(job_id): job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() - job.processing_started = start job.processing_finished = finished dao_update_job(job) current_app.logger.info( diff --git a/app/config.py b/app/config.py index 9e7410471..cd9e3cda9 100644 --- a/app/config.py +++ b/app/config.py @@ -234,6 +234,11 @@ class Config(object): 'task': 'run-letter-api-notifications', 'schedule': crontab(hour=17, minute=40), 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-job-status': { + 'task': 'check-job-status', + 'schedule': crontab(), + 'options': {'queue': QueueNames.PERIODIC} } } CELERY_QUEUES = [] diff --git a/app/v2/errors.py b/app/v2/errors.py index 0fa6bddac..cefdeef87 100644 --- a/app/v2/errors.py +++ b/app/v2/errors.py @@ -7,6 +7,23 @@ from app.authentication.auth import AuthError from app.errors import InvalidRequest +class JobIncompleteError(Exception): + def __init__(self, message): + self.message = message + self.status_code = 500 + + def to_dict_v2(self): + return { + 'status_code': self.status_code, + "errors": [ + { + "error": 'JobIncompleteError', + "message": self.message + } + ] + } + + class TooManyRequestsError(InvalidRequest): status_code = 429 message_template = 'Exceeded send limits ({}) for today' @@ -49,6 +66,10 @@ def register_errors(blueprint): current_app.logger.exception(error) return jsonify(json.loads(error.message)), 400 + @blueprint.errorhandler(JobIncompleteError) + def job_incomplete_error(error): + return jsonify(error.to_dict_v2()), 500 + @blueprint.errorhandler(NoResultFound) @blueprint.errorhandler(DataError) def no_result_found(e): diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 9d770888b..4c69f7ba8 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -29,7 +29,7 @@ from app.celery.scheduled_tasks import ( timeout_job_statistics, timeout_notifications, populate_monthly_billing, - send_total_sent_notifications_to_performance_platform) + send_total_sent_notifications_to_performance_platform, check_job_status) from app.clients.performance_platform.performance_platform_client import PerformancePlatformClient from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -47,8 +47,9 @@ from app.models import ( NOTIFICATION_PENDING, NOTIFICATION_CREATED, KEY_TYPE_TEST, - MonthlyBilling) + MonthlyBilling, JOB_STATUS_FINISHED) from app.utils import get_london_midnight_in_utc +from app.v2.errors import JobIncompleteError from tests.app.db import create_notification, create_service, create_template, create_job, create_rate from tests.app.conftest import ( sample_job as create_sample_job, @@ -743,7 +744,6 @@ def test_run_letter_api_notifications_triggers_ftp_task(client, mocker, sample_l def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( - client, mocker, sample_letter_template, sample_letter_job, @@ -753,7 +753,7 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( sample_letter_template, job=sample_letter_job ) - pending_letter_notification = create_notification( + create_notification( sample_letter_template, status=NOTIFICATION_PENDING, api_key=sample_api_key @@ -770,3 +770,55 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( assert not mock_celery.called assert letter_job_notification.status == NOTIFICATION_CREATED assert test_api_key_notification.status == NOTIFICATION_CREATED + + +def test_check_job_status_task_raises_job_incomplete_error(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + create_notification(template=sample_template, job=job) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + + +def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + + +def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + job_2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) in e.value.message + + +def test_check_job_status_task_does_not_raise_error(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + job_2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + check_job_status() diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 574ed2eb7..980622680 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -17,7 +17,6 @@ from app.celery.tasks import ( s3, build_dvla_file, create_dvla_file_contents_for_job, - update_dvla_job_to_error, process_job, process_row, send_sms, @@ -29,7 +28,6 @@ from app.config import QueueNames from app.dao import jobs_dao, services_dao from app.models import ( EMAIL_TYPE, - JOB_STATUS_ERROR, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 3124b3a13..5ec851edc 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -52,7 +52,12 @@ from app.dao.notifications_dao import ( ) from app.dao.services_dao import dao_update_service -from tests.app.db import create_notification, create_api_key, create_reply_to_email +from tests.app.db import ( + create_api_key, + create_job, + create_notification, + create_reply_to_email +) from tests.app.conftest import ( sample_notification, sample_template, diff --git a/tests/app/v2/test_errors.py b/tests/app/v2/test_errors.py index 8345faacc..f08367f81 100644 --- a/tests/app/v2/test_errors.py +++ b/tests/app/v2/test_errors.py @@ -9,7 +9,7 @@ def app_for_test(mocker): import flask from flask import Blueprint from app.authentication.auth import AuthError - from app.v2.errors import BadRequestError, TooManyRequestsError + from app.v2.errors import BadRequestError, TooManyRequestsError, JobIncompleteError app = flask.Flask(__name__) app.config['TESTING'] = True @@ -39,6 +39,10 @@ def app_for_test(mocker): def raising_data_error(): raise DataError("There was a db problem", "params", "orig") + @blue.route("raise_job_incomplete_error", methods=["GET"]) + def raising_job_incomplete_error(): + raise JobIncompleteError("Raising job incomplete error") + @blue.route("raise_exception", methods=["GET"]) def raising_exception(): raise AssertionError("Raising any old exception") @@ -107,6 +111,16 @@ def test_data_errors(app_for_test): "errors": [{"error": "DataError", "message": "No result found"}]} +def test_job_incomplete_errors(app_for_test): + with app_for_test.test_request_context(): + with app_for_test.test_client() as client: + response = client.get(url_for('v2_under_test.raising_job_incomplete_error')) + assert response.status_code == 500 + error = json.loads(response.get_data(as_text=True)) + assert error == {"status_code": 500, + "errors": [{"error": "JobIncompleteError", "message": "Raising job incomplete error"}]} + + def test_internal_server_error_handler(app_for_test): with app_for_test.test_request_context(): with app_for_test.test_client() as client: