diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 599d7b972..d73fe93d7 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,5 +1,5 @@ import json -from datetime import (datetime) +from datetime import datetime from collections import namedtuple from flask import current_app @@ -54,6 +54,7 @@ from app.models import ( from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd +from app.v2.errors import JobIncompleteError from notifications_utils.s3 import s3upload @@ -88,6 +89,9 @@ def process_job(job_id): current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) + if template.template_type != LETTER_TYPE: + check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS) + for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, @@ -95,21 +99,14 @@ def process_job(job_id): ).enumerated_recipients_and_personalisation: process_row(row_number, recipient, personalisation, template, job, service) - notification_total_in_db = dao_get_total_notifications_for_job_id(job.id) - - if job.notification_count != notification_total_in_db: - current_app.logger.error("Job {} is missing {} notifications".format( - job.id, notification_total_in_db - notification_total_in_db)) - job.job_status = JOB_STATUS_ERROR - else: - if template.template_type == LETTER_TYPE: - if service.research_mode: - update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) - else: - build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) - current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + if template.template_type == LETTER_TYPE: + if service.research_mode: + update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) else: - job.job_status = JOB_STATUS_FINISHED + build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) + current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + else: + job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_started = start @@ -333,6 +330,14 @@ def update_dvla_job_to_error(self, job_id): current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR)) +@notify_celery.task(bind=True, name='check-job-status', countdown=3600) +@statsd(namespace="tasks") +def check_job_status(self, job_id): + job = dao_get_job_by_id(job_id) + if job.job_status != JOB_STATUS_FINISHED: + raise JobIncompleteError("Job {} did not complete".format(job_id)) + + @notify_celery.task(bind=True, name='update-letter-notifications-to-sent') @statsd(namespace="tasks") def update_letter_notifications_to_sent_to_dvla(self, notification_references): diff --git a/app/config.py b/app/config.py index 9e7410471..394e77dbf 100644 --- a/app/config.py +++ b/app/config.py @@ -19,6 +19,7 @@ if os.environ.get('VCAP_SERVICES'): class QueueNames(object): + CHECK_JOBS = 'check-job-status-tasks' PERIODIC = 'periodic-tasks' PRIORITY = 'priority-tasks' DATABASE = 'database-tasks' @@ -34,6 +35,7 @@ class QueueNames(object): @staticmethod def all_queues(): return [ + QueueNames.CHECK_JOBS, QueueNames.PRIORITY, QueueNames.PERIODIC, QueueNames.DATABASE, diff --git a/app/v2/errors.py b/app/v2/errors.py index bbec6d5e0..cefdeef87 100644 --- a/app/v2/errors.py +++ b/app/v2/errors.py @@ -10,10 +10,11 @@ from app.errors import InvalidRequest class JobIncompleteError(Exception): def __init__(self, message): self.message = message + self.status_code = 500 def to_dict_v2(self): return { - 'status_code': 500, + 'status_code': self.status_code, "errors": [ { "error": 'JobIncompleteError', diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index cc01fc25c..beb62a780 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -16,6 +16,7 @@ from app.celery import tasks from app.celery.tasks import ( s3, build_dvla_file, + check_job_status, create_dvla_file_contents_for_job, update_dvla_job_to_error, process_job, @@ -30,6 +31,7 @@ from app.dao import jobs_dao, services_dao from app.models import ( EMAIL_TYPE, JOB_STATUS_ERROR, + JOB_STATUS_FINISHED, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, @@ -39,6 +41,7 @@ from app.models import ( Job, Notification ) +from app.v2.errors import JobIncompleteError from tests.app import load_example_csv from tests.app.conftest import ( @@ -96,6 +99,7 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_sms_job(sample_job, mocker): + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -132,6 +136,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, service = create_sample_service(notify_db, notify_db_session, limit=9) job = create_sample_job(notify_db, notify_db_session, service=service, notification_count=10) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -153,6 +158,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify create_sample_notification(notify_db, notify_db_session, service=service, job=job) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -173,6 +179,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti create_sample_notification(notify_db, notify_db_session, service=service, job=job) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -192,6 +199,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -208,6 +216,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): job = create_sample_job(notify_db, notify_db_session, job_status='scheduled') + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -227,6 +236,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -254,6 +264,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_job.id) @@ -271,6 +282,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): email_csv = """email_address,name test@test.com,foo """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -309,6 +321,7 @@ def test_should_process_letter_job(sample_letter_job, mocker): process_row_mock = mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") mocker.patch('app.celery.tasks.build_dvla_file') + mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_letter_job.id) @@ -339,6 +352,7 @@ def test_should_process_letter_job(sample_letter_job, mocker): def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mocker): + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -360,29 +374,6 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, assert job.job_status == 'finished' -def test_should_error_log_missing_notifications( - sample_job_with_placeholdered_template, mocker): - multiple_sms = load_example_csv('multiple_sms').strip() - num_phone_numbers_after_header = len(multiple_sms.split('\n')[1:]) - sample_job_with_placeholdered_template.notification_count = num_phone_numbers_after_header - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=multiple_sms) - mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") - # deliberately return wrong total notifications to trigger error log - mocker.patch( - 'app.celery.tasks.dao_get_total_notifications_for_job_id', - return_value=num_phone_numbers_after_header - 1 - ) - error_log = mocker.patch('app.celery.tasks.current_app.logger.error') - - process_job(sample_job_with_placeholdered_template.id) - - job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id) - assert job.job_status == 'error' - assert error_log.called - # -------------- process_row tests -------------- # @@ -466,6 +457,7 @@ def test_should_put_send_sms_task_in_research_mode_queue_if_research_mode_servic notification = _notification_json(template, to="+447234123123") + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() @@ -648,6 +640,7 @@ def test_should_not_build_dvla_file_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -670,6 +663,7 @@ def test_should_update_job_to_sent_to_dvla_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mock_update_job_task = mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -1229,3 +1223,17 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) mocked.call_count == 0 + + +def test_job_incomplete_raises_job_incomplete_error(sample_job): + assert sample_job.job_status != JOB_STATUS_FINISHED + with pytest.raises(JobIncompleteError) as e: + check_job_status(str(sample_job.id)) + + assert e.value.status_code == 500 + assert e.value.message == 'Job {} did not complete'.format(sample_job.id) + + +def test_job_finished_does_not_raises_job_incomplete_error(sample_job): + sample_job.job_status = JOB_STATUS_FINISHED + check_job_status(str(sample_job.id)) diff --git a/tests/app/test_config.py b/tests/app/test_config.py index 3a40b4db4..0566bb6d6 100644 --- a/tests/app/test_config.py +++ b/tests/app/test_config.py @@ -63,7 +63,7 @@ def test_cloudfoundry_config_has_different_defaults(): def test_queue_names_all_queues_correct(): # Need to ensure that all_queues() only returns queue names used in API queues = QueueNames.all_queues() - assert len(queues) == 10 + assert len(queues) == 11 assert set([ QueueNames.PRIORITY, QueueNames.PERIODIC, @@ -74,5 +74,6 @@ def test_queue_names_all_queues_correct(): QueueNames.STATISTICS, QueueNames.JOBS, QueueNames.RETRY, - QueueNames.NOTIFY + QueueNames.NOTIFY, + QueueNames.CHECK_JOBS, ]) == set(queues)