From dcf0d22d7b3772632d016eac89ae0cea0c1b367d Mon Sep 17 00:00:00 2001 From: Ken Tsang Date: Tue, 10 Oct 2017 15:04:55 +0100 Subject: [PATCH 1/7] Added alert when job.notification_count doesn't match total notification for job - Added log for when a job starts so that we will know when the processing of a job starts with the number of notifications - Added dao method to get total notifications for a job id - Added a test to check whether the number of notifications in the table matches the job notification_count --- app/celery/tasks.py | 24 +++++++++++++------ app/dao/notifications_dao.py | 10 ++++++++ tests/app/celery/test_tasks.py | 23 ++++++++++++++++++ .../notification_dao/test_notification_dao.py | 18 +++++++++++++- 4 files changed, 67 insertions(+), 8 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index a64bb9cae..599d7b972 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -29,6 +29,7 @@ from app.dao.jobs_dao import ( dao_update_job_status) from app.dao.notifications_dao import ( get_notification_by_id, + dao_get_total_notifications_for_job_id, dao_update_notifications_for_job_to_sent_to_dvla, dao_update_notifications_by_reference ) @@ -85,6 +86,8 @@ def process_job(job_id): TemplateClass = get_template_class(db_template.template_type) template = TemplateClass(db_template.__dict__) + current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) + for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, @@ -92,14 +95,21 @@ def process_job(job_id): ).enumerated_recipients_and_personalisation: process_row(row_number, recipient, personalisation, template, job, service) - if template.template_type == LETTER_TYPE: - if service.research_mode: - update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) - else: - build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) - current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + notification_total_in_db = dao_get_total_notifications_for_job_id(job.id) + + if job.notification_count != notification_total_in_db: + current_app.logger.error("Job {} is missing {} notifications".format( + job.id, notification_total_in_db - notification_total_in_db)) + job.job_status = JOB_STATUS_ERROR else: - job.job_status = JOB_STATUS_FINISHED + if template.template_type == LETTER_TYPE: + if service.research_mode: + update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) + else: + build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) + current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + else: + job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_started = start diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 4183c6f4c..72f9a9a89 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -442,6 +442,16 @@ def get_total_sent_notifications_in_date_range(start_date, end_date, notificatio return result or 0 +def dao_get_total_notifications_for_job_id(job_id): + result = db.session.query( + func.count(Notification.id).label('count') + ).filter( + Notification.job_id == job_id + ).scalar() + + return result or 0 + + def is_delivery_slow_for_provider( sent_at, provider, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 574ed2eb7..cc01fc25c 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -360,6 +360,29 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, assert job.job_status == 'finished' +def test_should_error_log_missing_notifications( + sample_job_with_placeholdered_template, mocker): + multiple_sms = load_example_csv('multiple_sms').strip() + num_phone_numbers_after_header = len(multiple_sms.split('\n')[1:]) + sample_job_with_placeholdered_template.notification_count = num_phone_numbers_after_header + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=multiple_sms) + mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + # deliberately return wrong total notifications to trigger error log + mocker.patch( + 'app.celery.tasks.dao_get_total_notifications_for_job_id', + return_value=num_phone_numbers_after_header - 1 + ) + error_log = mocker.patch('app.celery.tasks.current_app.logger.error') + + process_job(sample_job_with_placeholdered_template.id) + + job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id) + assert job.job_status == 'error' + assert error_log.called + # -------------- process_row tests -------------- # diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 3124b3a13..818b44141 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -35,6 +35,7 @@ from app.dao.notifications_dao import ( dao_get_potential_notification_statistics_for_day, dao_get_scheduled_notifications, dao_get_template_usage, + dao_get_total_notifications_for_job_id, dao_timeout_notifications, dao_update_notification, dao_update_notifications_for_job_to_sent_to_dvla, @@ -52,7 +53,12 @@ from app.dao.notifications_dao import ( ) from app.dao.services_dao import dao_update_service -from tests.app.db import create_notification, create_api_key, create_reply_to_email +from tests.app.db import ( + create_api_key, + create_job, + create_notification, + create_reply_to_email +) from tests.app.conftest import ( sample_notification, sample_template, @@ -1995,3 +2001,13 @@ def test_dao_get_notification_ememail_reply_toail_reply_for_notification(sample_ def test_dao_get_notification_email_reply_for_notification_where_no_mapping(notify_db_session, fake_uuid): assert dao_get_notification_email_reply_for_notification(fake_uuid) is None + + +def test_dao_get_total_notifications_for_job_id(sample_job): + job = create_job(sample_job.template) + create_notification(sample_job.template, job=sample_job) + create_notification(sample_job.template, job=sample_job) + create_notification(sample_job.template, job=sample_job) + create_notification(sample_job.template, job=job) + + assert dao_get_total_notifications_for_job_id(sample_job.id) == 3 From 92508d3b96d101e3e7535ea7b68261837da51ff9 Mon Sep 17 00:00:00 2001 From: Ken Tsang Date: Wed, 11 Oct 2017 15:50:20 +0100 Subject: [PATCH 2/7] Create JobIncompleteError --- app/v2/errors.py | 20 ++++++++++++++++++++ tests/app/v2/test_errors.py | 16 +++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/app/v2/errors.py b/app/v2/errors.py index 0fa6bddac..bbec6d5e0 100644 --- a/app/v2/errors.py +++ b/app/v2/errors.py @@ -7,6 +7,22 @@ from app.authentication.auth import AuthError from app.errors import InvalidRequest +class JobIncompleteError(Exception): + def __init__(self, message): + self.message = message + + def to_dict_v2(self): + return { + 'status_code': 500, + "errors": [ + { + "error": 'JobIncompleteError', + "message": self.message + } + ] + } + + class TooManyRequestsError(InvalidRequest): status_code = 429 message_template = 'Exceeded send limits ({}) for today' @@ -49,6 +65,10 @@ def register_errors(blueprint): current_app.logger.exception(error) return jsonify(json.loads(error.message)), 400 + @blueprint.errorhandler(JobIncompleteError) + def job_incomplete_error(error): + return jsonify(error.to_dict_v2()), 500 + @blueprint.errorhandler(NoResultFound) @blueprint.errorhandler(DataError) def no_result_found(e): diff --git a/tests/app/v2/test_errors.py b/tests/app/v2/test_errors.py index 8345faacc..f08367f81 100644 --- a/tests/app/v2/test_errors.py +++ b/tests/app/v2/test_errors.py @@ -9,7 +9,7 @@ def app_for_test(mocker): import flask from flask import Blueprint from app.authentication.auth import AuthError - from app.v2.errors import BadRequestError, TooManyRequestsError + from app.v2.errors import BadRequestError, TooManyRequestsError, JobIncompleteError app = flask.Flask(__name__) app.config['TESTING'] = True @@ -39,6 +39,10 @@ def app_for_test(mocker): def raising_data_error(): raise DataError("There was a db problem", "params", "orig") + @blue.route("raise_job_incomplete_error", methods=["GET"]) + def raising_job_incomplete_error(): + raise JobIncompleteError("Raising job incomplete error") + @blue.route("raise_exception", methods=["GET"]) def raising_exception(): raise AssertionError("Raising any old exception") @@ -107,6 +111,16 @@ def test_data_errors(app_for_test): "errors": [{"error": "DataError", "message": "No result found"}]} +def test_job_incomplete_errors(app_for_test): + with app_for_test.test_request_context(): + with app_for_test.test_client() as client: + response = client.get(url_for('v2_under_test.raising_job_incomplete_error')) + assert response.status_code == 500 + error = json.loads(response.get_data(as_text=True)) + assert error == {"status_code": 500, + "errors": [{"error": "JobIncompleteError", "message": "Raising job incomplete error"}]} + + def test_internal_server_error_handler(app_for_test): with app_for_test.test_request_context(): with app_for_test.test_client() as client: From c29fc8cfa4116a94872deb38df5ec12cee7f13d2 Mon Sep 17 00:00:00 2001 From: Ken Tsang Date: Wed, 11 Oct 2017 18:14:56 +0100 Subject: [PATCH 3/7] Add celery task to check job finished - celery task called after countdown of 60 minutes from start of job processing --- app/celery/tasks.py | 35 ++++++++++++---------- app/config.py | 2 ++ app/v2/errors.py | 3 +- tests/app/celery/test_tasks.py | 54 +++++++++++++++++++--------------- tests/app/test_config.py | 5 ++-- 5 files changed, 58 insertions(+), 41 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 599d7b972..d73fe93d7 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,5 +1,5 @@ import json -from datetime import (datetime) +from datetime import datetime from collections import namedtuple from flask import current_app @@ -54,6 +54,7 @@ from app.models import ( from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd +from app.v2.errors import JobIncompleteError from notifications_utils.s3 import s3upload @@ -88,6 +89,9 @@ def process_job(job_id): current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) + if template.template_type != LETTER_TYPE: + check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS) + for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, @@ -95,21 +99,14 @@ def process_job(job_id): ).enumerated_recipients_and_personalisation: process_row(row_number, recipient, personalisation, template, job, service) - notification_total_in_db = dao_get_total_notifications_for_job_id(job.id) - - if job.notification_count != notification_total_in_db: - current_app.logger.error("Job {} is missing {} notifications".format( - job.id, notification_total_in_db - notification_total_in_db)) - job.job_status = JOB_STATUS_ERROR - else: - if template.template_type == LETTER_TYPE: - if service.research_mode: - update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) - else: - build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) - current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + if template.template_type == LETTER_TYPE: + if service.research_mode: + update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) else: - job.job_status = JOB_STATUS_FINISHED + build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) + current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + else: + job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_started = start @@ -333,6 +330,14 @@ def update_dvla_job_to_error(self, job_id): current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR)) +@notify_celery.task(bind=True, name='check-job-status', countdown=3600) +@statsd(namespace="tasks") +def check_job_status(self, job_id): + job = dao_get_job_by_id(job_id) + if job.job_status != JOB_STATUS_FINISHED: + raise JobIncompleteError("Job {} did not complete".format(job_id)) + + @notify_celery.task(bind=True, name='update-letter-notifications-to-sent') @statsd(namespace="tasks") def update_letter_notifications_to_sent_to_dvla(self, notification_references): diff --git a/app/config.py b/app/config.py index 9e7410471..394e77dbf 100644 --- a/app/config.py +++ b/app/config.py @@ -19,6 +19,7 @@ if os.environ.get('VCAP_SERVICES'): class QueueNames(object): + CHECK_JOBS = 'check-job-status-tasks' PERIODIC = 'periodic-tasks' PRIORITY = 'priority-tasks' DATABASE = 'database-tasks' @@ -34,6 +35,7 @@ class QueueNames(object): @staticmethod def all_queues(): return [ + QueueNames.CHECK_JOBS, QueueNames.PRIORITY, QueueNames.PERIODIC, QueueNames.DATABASE, diff --git a/app/v2/errors.py b/app/v2/errors.py index bbec6d5e0..cefdeef87 100644 --- a/app/v2/errors.py +++ b/app/v2/errors.py @@ -10,10 +10,11 @@ from app.errors import InvalidRequest class JobIncompleteError(Exception): def __init__(self, message): self.message = message + self.status_code = 500 def to_dict_v2(self): return { - 'status_code': 500, + 'status_code': self.status_code, "errors": [ { "error": 'JobIncompleteError', diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index cc01fc25c..beb62a780 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -16,6 +16,7 @@ from app.celery import tasks from app.celery.tasks import ( s3, build_dvla_file, + check_job_status, create_dvla_file_contents_for_job, update_dvla_job_to_error, process_job, @@ -30,6 +31,7 @@ from app.dao import jobs_dao, services_dao from app.models import ( EMAIL_TYPE, JOB_STATUS_ERROR, + JOB_STATUS_FINISHED, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, @@ -39,6 +41,7 @@ from app.models import ( Job, Notification ) +from app.v2.errors import JobIncompleteError from tests.app import load_example_csv from tests.app.conftest import ( @@ -96,6 +99,7 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_sms_job(sample_job, mocker): + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -132,6 +136,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, service = create_sample_service(notify_db, notify_db_session, limit=9) job = create_sample_job(notify_db, notify_db_session, service=service, notification_count=10) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -153,6 +158,7 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify create_sample_notification(notify_db, notify_db_session, service=service, job=job) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -173,6 +179,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti create_sample_notification(notify_db, notify_db_session, service=service, job=job) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -192,6 +199,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -208,6 +216,7 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): job = create_sample_job(notify_db, notify_db_session, job_status='scheduled') + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -227,6 +236,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10) + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -254,6 +264,7 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_job.id) @@ -271,6 +282,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): email_csv = """email_address,name test@test.com,foo """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -309,6 +321,7 @@ def test_should_process_letter_job(sample_letter_job, mocker): process_row_mock = mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") mocker.patch('app.celery.tasks.build_dvla_file') + mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_letter_job.id) @@ -339,6 +352,7 @@ def test_should_process_letter_job(sample_letter_job, mocker): def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mocker): + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -360,29 +374,6 @@ def test_should_process_all_sms_job(sample_job_with_placeholdered_template, assert job.job_status == 'finished' -def test_should_error_log_missing_notifications( - sample_job_with_placeholdered_template, mocker): - multiple_sms = load_example_csv('multiple_sms').strip() - num_phone_numbers_after_header = len(multiple_sms.split('\n')[1:]) - sample_job_with_placeholdered_template.notification_count = num_phone_numbers_after_header - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=multiple_sms) - mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") - # deliberately return wrong total notifications to trigger error log - mocker.patch( - 'app.celery.tasks.dao_get_total_notifications_for_job_id', - return_value=num_phone_numbers_after_header - 1 - ) - error_log = mocker.patch('app.celery.tasks.current_app.logger.error') - - process_job(sample_job_with_placeholdered_template.id) - - job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id) - assert job.job_status == 'error' - assert error_log.called - # -------------- process_row tests -------------- # @@ -466,6 +457,7 @@ def test_should_put_send_sms_task_in_research_mode_queue_if_research_mode_servic notification = _notification_json(template, to="+447234123123") + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() @@ -648,6 +640,7 @@ def test_should_not_build_dvla_file_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -670,6 +663,7 @@ def test_should_update_job_to_sent_to_dvla_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ + mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mock_update_job_task = mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -1229,3 +1223,17 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) mocked.call_count == 0 + + +def test_job_incomplete_raises_job_incomplete_error(sample_job): + assert sample_job.job_status != JOB_STATUS_FINISHED + with pytest.raises(JobIncompleteError) as e: + check_job_status(str(sample_job.id)) + + assert e.value.status_code == 500 + assert e.value.message == 'Job {} did not complete'.format(sample_job.id) + + +def test_job_finished_does_not_raises_job_incomplete_error(sample_job): + sample_job.job_status = JOB_STATUS_FINISHED + check_job_status(str(sample_job.id)) diff --git a/tests/app/test_config.py b/tests/app/test_config.py index 3a40b4db4..0566bb6d6 100644 --- a/tests/app/test_config.py +++ b/tests/app/test_config.py @@ -63,7 +63,7 @@ def test_cloudfoundry_config_has_different_defaults(): def test_queue_names_all_queues_correct(): # Need to ensure that all_queues() only returns queue names used in API queues = QueueNames.all_queues() - assert len(queues) == 10 + assert len(queues) == 11 assert set([ QueueNames.PRIORITY, QueueNames.PERIODIC, @@ -74,5 +74,6 @@ def test_queue_names_all_queues_correct(): QueueNames.STATISTICS, QueueNames.JOBS, QueueNames.RETRY, - QueueNames.NOTIFY + QueueNames.NOTIFY, + QueueNames.CHECK_JOBS, ]) == set(queues) From f9a7a78e55ff192d25f616bb7beee086b2efcfec Mon Sep 17 00:00:00 2001 From: Ken Tsang Date: Wed, 11 Oct 2017 21:48:47 +0100 Subject: [PATCH 4/7] Refactor code - removed redundant dao method - also handle letter jobs --- app/celery/tasks.py | 8 ++++---- app/dao/notifications_dao.py | 10 ---------- tests/app/celery/test_tasks.py | 13 +++++++++++-- .../dao/notification_dao/test_notification_dao.py | 11 ----------- 4 files changed, 15 insertions(+), 27 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index d73fe93d7..9122302d0 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -29,7 +29,6 @@ from app.dao.jobs_dao import ( dao_update_job_status) from app.dao.notifications_dao import ( get_notification_by_id, - dao_get_total_notifications_for_job_id, dao_update_notifications_for_job_to_sent_to_dvla, dao_update_notifications_by_reference ) @@ -89,8 +88,7 @@ def process_job(job_id): current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) - if template.template_type != LETTER_TYPE: - check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS) + check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS) for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), @@ -334,7 +332,9 @@ def update_dvla_job_to_error(self, job_id): @statsd(namespace="tasks") def check_job_status(self, job_id): job = dao_get_job_by_id(job_id) - if job.job_status != JOB_STATUS_FINISHED: + + if (job.template.template_type == LETTER_TYPE and job.job_status != JOB_STATUS_SENT_TO_DVLA) or\ + (job.template.template_type != LETTER_TYPE and job.job_status != JOB_STATUS_FINISHED): raise JobIncompleteError("Job {} did not complete".format(job_id)) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 72f9a9a89..4183c6f4c 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -442,16 +442,6 @@ def get_total_sent_notifications_in_date_range(start_date, end_date, notificatio return result or 0 -def dao_get_total_notifications_for_job_id(job_id): - result = db.session.query( - func.count(Notification.id).label('count') - ).filter( - Notification.job_id == job_id - ).scalar() - - return result or 0 - - def is_delivery_slow_for_provider( sent_at, provider, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index beb62a780..3b787e05e 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -32,6 +32,7 @@ from app.models import ( EMAIL_TYPE, JOB_STATUS_ERROR, JOB_STATUS_FINISHED, + JOB_STATUS_SENT_TO_DVLA, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, @@ -1234,6 +1235,14 @@ def test_job_incomplete_raises_job_incomplete_error(sample_job): assert e.value.message == 'Job {} did not complete'.format(sample_job.id) -def test_job_finished_does_not_raises_job_incomplete_error(sample_job): - sample_job.job_status = JOB_STATUS_FINISHED +@pytest.mark.parametrize('job_status,notification_type', + [ + (JOB_STATUS_FINISHED, SMS_TYPE), + (JOB_STATUS_SENT_TO_DVLA, LETTER_TYPE) + ] +) +def test_job_finished_does_not_raises_job_incomplete_error( + sample_job, job_status, notification_type): + sample_job.job_status = job_status + sample_job.template.template_type = notification_type check_job_status(str(sample_job.id)) diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 818b44141..5ec851edc 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -35,7 +35,6 @@ from app.dao.notifications_dao import ( dao_get_potential_notification_statistics_for_day, dao_get_scheduled_notifications, dao_get_template_usage, - dao_get_total_notifications_for_job_id, dao_timeout_notifications, dao_update_notification, dao_update_notifications_for_job_to_sent_to_dvla, @@ -2001,13 +2000,3 @@ def test_dao_get_notification_ememail_reply_toail_reply_for_notification(sample_ def test_dao_get_notification_email_reply_for_notification_where_no_mapping(notify_db_session, fake_uuid): assert dao_get_notification_email_reply_for_notification(fake_uuid) is None - - -def test_dao_get_total_notifications_for_job_id(sample_job): - job = create_job(sample_job.template) - create_notification(sample_job.template, job=sample_job) - create_notification(sample_job.template, job=sample_job) - create_notification(sample_job.template, job=sample_job) - create_notification(sample_job.template, job=job) - - assert dao_get_total_notifications_for_job_id(sample_job.id) == 3 From e08690cad2e3bd566c736afd41dbf1b2983cc6d0 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 12 Oct 2017 16:21:08 +0100 Subject: [PATCH 5/7] Added a new scheduled task that runs every minute, may want to run it every 3 minutes. The tasks checks that a job is not still in progress 30 minutes after the job started processing. --- app/celery/scheduled_tasks.py | 36 ++++++++++++-- app/celery/tasks.py | 27 ++++------- app/config.py | 5 ++ tests/app/celery/test_scheduled_tasks.py | 60 ++++++++++++++++++++++-- tests/app/celery/test_tasks.py | 42 ----------------- 5 files changed, 104 insertions(+), 66 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index f014c65ed..79afb4153 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -4,6 +4,7 @@ from datetime import ( ) from flask import current_app +from sqlalchemy import or_, and_ from sqlalchemy.exc import SQLAlchemyError from notifications_utils.s3 import s3upload @@ -17,8 +18,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da from app.dao.jobs_dao import ( dao_get_letter_job_ids_by_status, dao_set_scheduled_jobs_to_pending, - dao_get_jobs_older_than_limited_by -) + dao_get_jobs_older_than_limited_by, + dao_get_job_by_id) from app.dao.monthly_billing_dao import ( get_service_ids_that_need_billing_populated, create_or_update_monthly_billing @@ -37,12 +38,14 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago -from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND +from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \ + EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications from app.config import QueueNames, TaskNames from app.utils import convert_utc_to_bst +from app.v2.errors import JobIncompleteError @notify_celery.task(name="remove_csv_files") @@ -354,3 +357,30 @@ def run_letter_api_notifications(): QueueNames.PROCESS_FTP ) ) + + +@notify_celery.task(name='check-job-status') +@statsd(namespace="tasks") +def check_job_status(): + """ + every x minutes do this check + select + from jobs + where job_status == 'in progress' + and template_type in ('sms', 'email') + and scheduled_at or created_at is older that 30 minutes. + if any results then + raise error + process the rows in the csv thatgit c are missing (in another task) just do the check here. + """ + thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30) + thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35) + + jobs_not_complete_after_30_minutes = Job.query.filter( + Job.job_status == JOB_STATUS_IN_PROGRESS, + and_(thirty_five_minutes_ago < Job.processing_started, Job.processing_started < thirty_minutes_ago) + ).all() + + job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] + if job_ids: + raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 9122302d0..0279c321f 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -6,9 +6,15 @@ from flask import current_app from notifications_utils.recipients import ( RecipientCSV ) -from notifications_utils.template import SMSMessageTemplate, WithSubjectTemplate, LetterDVLATemplate -from requests import HTTPError -from requests import request +from notifications_utils.template import ( + SMSMessageTemplate, + WithSubjectTemplate, + LetterDVLATemplate +) +from requests import ( + HTTPError, + request +) from sqlalchemy.exc import SQLAlchemyError from app import ( create_uuid, @@ -53,7 +59,6 @@ from app.models import ( from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd -from app.v2.errors import JobIncompleteError from notifications_utils.s3 import s3upload @@ -79,6 +84,7 @@ def process_job(job_id): return job.job_status = JOB_STATUS_IN_PROGRESS + job.processing_started = start dao_update_job(job) db_template = dao_get_template_by_id(job.template_id, job.template_version) @@ -88,8 +94,6 @@ def process_job(job_id): current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count)) - check_job_status.apply_async([str(job.id)], queue=QueueNames.JOBS) - for row_number, recipient, personalisation in RecipientCSV( s3.get_job_from_s3(str(service.id), str(job_id)), template_type=template.template_type, @@ -107,7 +111,6 @@ def process_job(job_id): job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() - job.processing_started = start job.processing_finished = finished dao_update_job(job) current_app.logger.info( @@ -328,16 +331,6 @@ def update_dvla_job_to_error(self, job_id): current_app.logger.info("Updated {} job to {}".format(job_id, JOB_STATUS_ERROR)) -@notify_celery.task(bind=True, name='check-job-status', countdown=3600) -@statsd(namespace="tasks") -def check_job_status(self, job_id): - job = dao_get_job_by_id(job_id) - - if (job.template.template_type == LETTER_TYPE and job.job_status != JOB_STATUS_SENT_TO_DVLA) or\ - (job.template.template_type != LETTER_TYPE and job.job_status != JOB_STATUS_FINISHED): - raise JobIncompleteError("Job {} did not complete".format(job_id)) - - @notify_celery.task(bind=True, name='update-letter-notifications-to-sent') @statsd(namespace="tasks") def update_letter_notifications_to_sent_to_dvla(self, notification_references): diff --git a/app/config.py b/app/config.py index 394e77dbf..26e8d47cf 100644 --- a/app/config.py +++ b/app/config.py @@ -236,6 +236,11 @@ class Config(object): 'task': 'run-letter-api-notifications', 'schedule': crontab(hour=17, minute=40), 'options': {'queue': QueueNames.PERIODIC} + }, + 'check-job-status': { + 'task': 'check-job-status', + 'schedule': crontab(), + 'options': {'queue': QueueNames.PERIODIC} } } CELERY_QUEUES = [] diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 9d770888b..4c69f7ba8 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -29,7 +29,7 @@ from app.celery.scheduled_tasks import ( timeout_job_statistics, timeout_notifications, populate_monthly_billing, - send_total_sent_notifications_to_performance_platform) + send_total_sent_notifications_to_performance_platform, check_job_status) from app.clients.performance_platform.performance_platform_client import PerformancePlatformClient from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -47,8 +47,9 @@ from app.models import ( NOTIFICATION_PENDING, NOTIFICATION_CREATED, KEY_TYPE_TEST, - MonthlyBilling) + MonthlyBilling, JOB_STATUS_FINISHED) from app.utils import get_london_midnight_in_utc +from app.v2.errors import JobIncompleteError from tests.app.db import create_notification, create_service, create_template, create_job, create_rate from tests.app.conftest import ( sample_job as create_sample_job, @@ -743,7 +744,6 @@ def test_run_letter_api_notifications_triggers_ftp_task(client, mocker, sample_l def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( - client, mocker, sample_letter_template, sample_letter_job, @@ -753,7 +753,7 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( sample_letter_template, job=sample_letter_job ) - pending_letter_notification = create_notification( + create_notification( sample_letter_template, status=NOTIFICATION_PENDING, api_key=sample_api_key @@ -770,3 +770,55 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( assert not mock_celery.called assert letter_job_notification.status == NOTIFICATION_CREATED assert test_api_key_notification.status == NOTIFICATION_CREATED + + +def test_check_job_status_task_raises_job_incomplete_error(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + create_notification(template=sample_template, job=job) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + + +def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + + +def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + job_2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + with pytest.raises(expected_exception=JobIncompleteError) as e: + check_job_status() + assert str(job.id) in e.value.message + assert str(job_2.id) in e.value.message + + +def test_check_job_status_task_does_not_raise_error(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + job_2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + check_job_status() diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 3b787e05e..980622680 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -16,9 +16,7 @@ from app.celery import tasks from app.celery.tasks import ( s3, build_dvla_file, - check_job_status, create_dvla_file_contents_for_job, - update_dvla_job_to_error, process_job, process_row, send_sms, @@ -30,9 +28,6 @@ from app.config import QueueNames from app.dao import jobs_dao, services_dao from app.models import ( EMAIL_TYPE, - JOB_STATUS_ERROR, - JOB_STATUS_FINISHED, - JOB_STATUS_SENT_TO_DVLA, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, @@ -42,7 +37,6 @@ from app.models import ( Job, Notification ) -from app.v2.errors import JobIncompleteError from tests.app import load_example_csv from tests.app.conftest import ( @@ -100,7 +94,6 @@ def email_job_with_placeholders(notify_db, notify_db_session, sample_email_templ @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_sms_job(sample_job, mocker): - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -137,7 +130,6 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, service = create_sample_service(notify_db, notify_db_session, limit=9) job = create_sample_job(notify_db, notify_db_session, service=service, notification_count=10) - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -159,7 +151,6 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify create_sample_notification(notify_db, notify_db_session, service=service, job=job) - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -180,7 +171,6 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti create_sample_notification(notify_db, notify_db_session, service=service, job=job) - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -200,7 +190,6 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template) - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -217,7 +206,6 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): job = create_sample_job(notify_db, notify_db_session, job_status='scheduled') - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.build_dvla_file') @@ -237,7 +225,6 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, template = create_sample_email_template(notify_db, notify_db_session, service=service) job = create_sample_job(notify_db, notify_db_session, service=service, template=template, notification_count=10) - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -265,7 +252,6 @@ def test_should_process_email_job_if_exactly_on_send_limits(notify_db, def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_job.id) @@ -283,7 +269,6 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): email_csv = """email_address,name test@test.com,foo """ - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=email_csv) mocker.patch('app.celery.tasks.send_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -322,7 +307,6 @@ def test_should_process_letter_job(sample_letter_job, mocker): process_row_mock = mocker.patch('app.celery.tasks.process_row') mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") mocker.patch('app.celery.tasks.build_dvla_file') - mocker.patch('app.celery.tasks.check_job_status.apply_async') process_job(sample_letter_job.id) @@ -353,7 +337,6 @@ def test_should_process_letter_job(sample_letter_job, mocker): def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mocker): - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) mocker.patch('app.celery.tasks.send_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") @@ -458,7 +441,6 @@ def test_should_put_send_sms_task_in_research_mode_queue_if_research_mode_servic notification = _notification_json(template, to="+447234123123") - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() @@ -641,7 +623,6 @@ def test_should_not_build_dvla_file_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -664,7 +645,6 @@ def test_should_update_job_to_sent_to_dvla_in_research_mode_for_letter_job( csv = """address_line_1,address_line_2,address_line_3,address_line_4,postcode,name A1,A2,A3,A4,A_POST,Alice """ - mocker.patch('app.celery.tasks.check_job_status.apply_async') mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=csv) mock_update_job_task = mocker.patch('app.celery.tasks.update_job_to_sent_to_dvla.apply_async') mocker.patch('app.celery.tasks.persist_letter.apply_async') @@ -1224,25 +1204,3 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) mocked.call_count == 0 - - -def test_job_incomplete_raises_job_incomplete_error(sample_job): - assert sample_job.job_status != JOB_STATUS_FINISHED - with pytest.raises(JobIncompleteError) as e: - check_job_status(str(sample_job.id)) - - assert e.value.status_code == 500 - assert e.value.message == 'Job {} did not complete'.format(sample_job.id) - - -@pytest.mark.parametrize('job_status,notification_type', - [ - (JOB_STATUS_FINISHED, SMS_TYPE), - (JOB_STATUS_SENT_TO_DVLA, LETTER_TYPE) - ] -) -def test_job_finished_does_not_raises_job_incomplete_error( - sample_job, job_status, notification_type): - sample_job.job_status = job_status - sample_job.template.template_type = notification_type - check_job_status(str(sample_job.id)) From fd2a7d3341780d47734f1d9cb61477144d94cb56 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 12 Oct 2017 16:23:28 +0100 Subject: [PATCH 6/7] fix comment --- app/celery/scheduled_tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 79afb4153..4ad49a527 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -371,7 +371,7 @@ def check_job_status(): and scheduled_at or created_at is older that 30 minutes. if any results then raise error - process the rows in the csv thatgit c are missing (in another task) just do the check here. + process the rows in the csv that are missing (in another task) just do the check here. """ thirty_minutes_ago = datetime.utcnow() - timedelta(minutes=30) thirty_five_minutes_ago = datetime.utcnow() - timedelta(minutes=35) From 401e20985673931ee5f81f4dc812fa349cc489e7 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 12 Oct 2017 16:57:04 +0100 Subject: [PATCH 7/7] Removed the new queue name as it is not needed. --- app/config.py | 2 -- tests/app/test_config.py | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/app/config.py b/app/config.py index 26e8d47cf..cd9e3cda9 100644 --- a/app/config.py +++ b/app/config.py @@ -19,7 +19,6 @@ if os.environ.get('VCAP_SERVICES'): class QueueNames(object): - CHECK_JOBS = 'check-job-status-tasks' PERIODIC = 'periodic-tasks' PRIORITY = 'priority-tasks' DATABASE = 'database-tasks' @@ -35,7 +34,6 @@ class QueueNames(object): @staticmethod def all_queues(): return [ - QueueNames.CHECK_JOBS, QueueNames.PRIORITY, QueueNames.PERIODIC, QueueNames.DATABASE, diff --git a/tests/app/test_config.py b/tests/app/test_config.py index 0566bb6d6..3a40b4db4 100644 --- a/tests/app/test_config.py +++ b/tests/app/test_config.py @@ -63,7 +63,7 @@ def test_cloudfoundry_config_has_different_defaults(): def test_queue_names_all_queues_correct(): # Need to ensure that all_queues() only returns queue names used in API queues = QueueNames.all_queues() - assert len(queues) == 11 + assert len(queues) == 10 assert set([ QueueNames.PRIORITY, QueueNames.PERIODIC, @@ -74,6 +74,5 @@ def test_queue_names_all_queues_correct(): QueueNames.STATISTICS, QueueNames.JOBS, QueueNames.RETRY, - QueueNames.NOTIFY, - QueueNames.CHECK_JOBS, + QueueNames.NOTIFY ]) == set(queues)