From 0509669969562e1d7fbd1ca7991fdf0ae8414d72 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Fri, 13 Oct 2017 16:46:17 +0100 Subject: [PATCH] Process Incomplete Jobs - Added a new task to process incomplete jobs - Added tests to test the new method - Updated the check for incomplete jobs method to start the new task This will effectively resume tasks which for some reason were interrupted whilst they were being processed. In some cases only some of the csv was processed, this will find the place in the csv and continue processing from that point. --- app/celery/scheduled_tasks.py | 66 ++++++++- app/celery/tasks.py | 18 ++- app/config.py | 1 + tests/app/celery/test_scheduled_tasks.py | 178 ++++++++++++++++++++++- 4 files changed, 251 insertions(+), 12 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 422eb6d4d..5c7bb441d 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -5,12 +5,15 @@ from datetime import ( from celery.signals import worker_process_shutdown from flask import current_app +from notifications_utils.recipients import RecipientCSV from sqlalchemy import or_, and_ from sqlalchemy.exc import SQLAlchemyError from notifications_utils.s3 import s3upload from app.aws import s3 from app import notify_celery +from app.celery import celery +from app.dao.templates_dao import dao_get_template_by_id from app.performance_platform import total_sent_notifications, processing_time from app import performance_platform_client from app.dao.date_util import get_month_start_and_end_date_in_utc @@ -39,11 +42,22 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago -from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \ - EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS +from app.models import ( + Job, + Notification, + LETTER_TYPE, + JOB_STATUS_READY_TO_SEND, + JOB_STATUS_IN_PROGRESS +) from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd -from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications +from app.celery.tasks import ( + create_dvla_file_contents_for_notifications, + get_template_class, + process_job, + process_row, + job_complete +) from app.config import QueueNames, TaskNames from app.utils import convert_utc_to_bst from app.v2.errors import JobIncompleteError @@ -389,4 +403,50 @@ def check_job_status(): job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] if job_ids: + notify_celery.send_task( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=(job_ids,), + queue=QueueNames.JOBS + ) raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) + + +@notify_celery.task(name='process-incomplete-jobs') +@statsd(namespace="tasks") +def process_incomplete_jobs(job_ids): + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) + for job_id in job_ids: + process_incomplete_job(job_id) + + +def process_incomplete_job(job_id): + + job = Job.query.filter(Job.id == job_id).one() + + last_notification_added = Notification.query.filter( + Notification.job_id == job_id + ).order_by( + Notification.job_row_number.desc() + ).first() + + if last_notification_added: + resume_from_row = last_notification_added.job_row_number + else: + resume_from_row = -1 # The first row in the csv with a number is row 0 + + current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) + + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + for row_number, recipient, personalisation in RecipientCSV( + s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders + ).enumerated_recipients_and_personalisation: + if row_number > resume_from_row: + process_row(row_number, recipient, personalisation, template, job, job.service) + + job_complete(job, job.service, template, True) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 45aa480dd..0f38d2379 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -107,21 +107,31 @@ def process_job(job_id): ).enumerated_recipients_and_personalisation: process_row(row_number, recipient, personalisation, template, job, service) + job_complete(job, service, template, False, start) + + +def job_complete(job, service, template, resumed, start=None): if template.template_type == LETTER_TYPE: if service.research_mode: update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) else: build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) - current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job.id, QueueNames.JOBS)) else: job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_finished = finished dao_update_job(job) - current_app.logger.info( - "Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished) - ) + + if resumed: + current_app.logger.info( + "Resumed Job {} completed at {}".format(job.id, job.created_at, start, finished) + ) + else: + current_app.logger.info( + "Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished) + ) def process_row(row_number, recipient, personalisation, template, job, service): diff --git a/app/config.py b/app/config.py index cd9e3cda9..aa74a4a84 100644 --- a/app/config.py +++ b/app/config.py @@ -50,6 +50,7 @@ class QueueNames(object): class TaskNames(object): DVLA_JOBS = 'send-jobs-to-dvla' DVLA_NOTIFICATIONS = 'send-api-notifications-to-dvla' + PROCESS_INCOMPLETE_JOBS = 'process-incomplete-jobs' class Config(object): diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 4c69f7ba8..090245ec1 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -29,7 +29,8 @@ from app.celery.scheduled_tasks import ( timeout_job_statistics, timeout_notifications, populate_monthly_billing, - send_total_sent_notifications_to_performance_platform, check_job_status) + send_total_sent_notifications_to_performance_platform, check_job_status, process_incomplete_job, + process_incomplete_jobs) from app.clients.performance_platform.performance_platform_client import PerformancePlatformClient from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -47,9 +48,10 @@ from app.models import ( NOTIFICATION_PENDING, NOTIFICATION_CREATED, KEY_TYPE_TEST, - MonthlyBilling, JOB_STATUS_FINISHED) + MonthlyBilling, JOB_STATUS_FINISHED, Job, Notification) from app.utils import get_london_midnight_in_utc from app.v2.errors import JobIncompleteError +from tests.app import load_example_csv from tests.app.db import create_notification, create_service, create_template, create_job, create_rate from tests.app.conftest import ( sample_job as create_sample_job, @@ -772,7 +774,8 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( assert test_api_key_notification.status == NOTIFICATION_CREATED -def test_check_job_status_task_raises_job_incomplete_error(sample_template): +def test_check_job_status_task_raises_job_incomplete_error(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), @@ -782,8 +785,15 @@ def test_check_job_status_task_raises_job_incomplete_error(sample_template): check_job_status() assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) -def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(sample_template): + +def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), @@ -793,8 +803,15 @@ def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is check_job_status() assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) -def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sample_template): + +def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), @@ -810,6 +827,12 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sam assert str(job.id) in e.value.message assert str(job_2.id) in e.value.message + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id), str(job_2.id)],), + queue=QueueNames.JOBS + ) + def test_check_job_status_task_does_not_raise_error(sample_template): job = create_job(template=sample_template, notification_count=3, @@ -821,4 +844,149 @@ def test_check_job_status_task_does_not_raise_error(sample_template): created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_FINISHED) + check_job_status() + + +def test_process_incomplete_job(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 2 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 8 # There are 10 in the file and we've added two already + + +def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + create_notification(sample_template, job, 3) + create_notification(sample_template, job, 4) + create_notification(sample_template, job, 5) + create_notification(sample_template, job, 6) + create_notification(sample_template, job, 7) + create_notification(sample_template, job, 8) + create_notification(sample_template, job, 9) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 10 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 0 # There are 10 in the file and we've added two already + + +def test_process_incomplete_jobs(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 3 + + job2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job2, 0) + create_notification(sample_template, job2, 1) + create_notification(sample_template, job2, 2) + create_notification(sample_template, job2, 3) + create_notification(sample_template, job2, 4) + + assert Notification.query.filter(Notification.job_id == job2.id).count() == 5 + + jobs = [job.id, job2.id] + process_incomplete_jobs(jobs) + + completed_job = Job.query.filter(Job.id == job.id).one() + completed_job2 = Job.query.filter(Job.id == job2.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert completed_job2.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 12 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 0 + + process_incomplete_job(job.id) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 10 # There are 10 in the csv file + + +def test_process_incomplete_jobs(mocker): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + jobs = [] + process_incomplete_jobs(jobs) + + assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + with pytest.raises(expected_exception=Exception) as e: + process_incomplete_job(fake_uuid) + + assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already