diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 422eb6d4d..5c7bb441d 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -5,12 +5,15 @@ from datetime import ( from celery.signals import worker_process_shutdown from flask import current_app +from notifications_utils.recipients import RecipientCSV from sqlalchemy import or_, and_ from sqlalchemy.exc import SQLAlchemyError from notifications_utils.s3 import s3upload from app.aws import s3 from app import notify_celery +from app.celery import celery +from app.dao.templates_dao import dao_get_template_by_id from app.performance_platform import total_sent_notifications, processing_time from app import performance_platform_client from app.dao.date_util import get_month_start_and_end_date_in_utc @@ -39,11 +42,22 @@ from app.dao.provider_details_dao import ( dao_toggle_sms_provider ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago -from app.models import LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_FINISHED, Job, \ - EMAIL_TYPE, SMS_TYPE, JOB_STATUS_IN_PROGRESS +from app.models import ( + Job, + Notification, + LETTER_TYPE, + JOB_STATUS_READY_TO_SEND, + JOB_STATUS_IN_PROGRESS +) from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd -from app.celery.tasks import process_job, create_dvla_file_contents_for_notifications +from app.celery.tasks import ( + create_dvla_file_contents_for_notifications, + get_template_class, + process_job, + process_row, + job_complete +) from app.config import QueueNames, TaskNames from app.utils import convert_utc_to_bst from app.v2.errors import JobIncompleteError @@ -389,4 +403,50 @@ def check_job_status(): job_ids = [str(x.id) for x in jobs_not_complete_after_30_minutes] if job_ids: + notify_celery.send_task( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=(job_ids,), + queue=QueueNames.JOBS + ) raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) + + +@notify_celery.task(name='process-incomplete-jobs') +@statsd(namespace="tasks") +def process_incomplete_jobs(job_ids): + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) + for job_id in job_ids: + process_incomplete_job(job_id) + + +def process_incomplete_job(job_id): + + job = Job.query.filter(Job.id == job_id).one() + + last_notification_added = Notification.query.filter( + Notification.job_id == job_id + ).order_by( + Notification.job_row_number.desc() + ).first() + + if last_notification_added: + resume_from_row = last_notification_added.job_row_number + else: + resume_from_row = -1 # The first row in the csv with a number is row 0 + + current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) + + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + for row_number, recipient, personalisation in RecipientCSV( + s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders + ).enumerated_recipients_and_personalisation: + if row_number > resume_from_row: + process_row(row_number, recipient, personalisation, template, job, job.service) + + job_complete(job, job.service, template, True) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 45aa480dd..0f38d2379 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -107,21 +107,31 @@ def process_job(job_id): ).enumerated_recipients_and_personalisation: process_row(row_number, recipient, personalisation, template, job, service) + job_complete(job, service, template, False, start) + + +def job_complete(job, service, template, resumed, start=None): if template.template_type == LETTER_TYPE: if service.research_mode: update_job_to_sent_to_dvla.apply_async([str(job.id)], queue=QueueNames.RESEARCH_MODE) else: build_dvla_file.apply_async([str(job.id)], queue=QueueNames.JOBS) - current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job_id, QueueNames.JOBS)) + current_app.logger.info("send job {} to build-dvla-file in the {} queue".format(job.id, QueueNames.JOBS)) else: job.job_status = JOB_STATUS_FINISHED finished = datetime.utcnow() job.processing_finished = finished dao_update_job(job) - current_app.logger.info( - "Job {} created at {} started at {} finished at {}".format(job_id, job.created_at, start, finished) - ) + + if resumed: + current_app.logger.info( + "Resumed Job {} completed at {}".format(job.id, job.created_at, start, finished) + ) + else: + current_app.logger.info( + "Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished) + ) def process_row(row_number, recipient, personalisation, template, job, service): diff --git a/app/config.py b/app/config.py index cd9e3cda9..aa74a4a84 100644 --- a/app/config.py +++ b/app/config.py @@ -50,6 +50,7 @@ class QueueNames(object): class TaskNames(object): DVLA_JOBS = 'send-jobs-to-dvla' DVLA_NOTIFICATIONS = 'send-api-notifications-to-dvla' + PROCESS_INCOMPLETE_JOBS = 'process-incomplete-jobs' class Config(object): diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 4c69f7ba8..090245ec1 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -29,7 +29,8 @@ from app.celery.scheduled_tasks import ( timeout_job_statistics, timeout_notifications, populate_monthly_billing, - send_total_sent_notifications_to_performance_platform, check_job_status) + send_total_sent_notifications_to_performance_platform, check_job_status, process_incomplete_job, + process_incomplete_jobs) from app.clients.performance_platform.performance_platform_client import PerformancePlatformClient from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -47,9 +48,10 @@ from app.models import ( NOTIFICATION_PENDING, NOTIFICATION_CREATED, KEY_TYPE_TEST, - MonthlyBilling, JOB_STATUS_FINISHED) + MonthlyBilling, JOB_STATUS_FINISHED, Job, Notification) from app.utils import get_london_midnight_in_utc from app.v2.errors import JobIncompleteError +from tests.app import load_example_csv from tests.app.db import create_notification, create_service, create_template, create_job, create_rate from tests.app.conftest import ( sample_job as create_sample_job, @@ -772,7 +774,8 @@ def test_run_letter_api_notifications_does_nothing_if_no_created_notifications( assert test_api_key_notification.status == NOTIFICATION_CREATED -def test_check_job_status_task_raises_job_incomplete_error(sample_template): +def test_check_job_status_task_raises_job_incomplete_error(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), @@ -782,8 +785,15 @@ def test_check_job_status_task_raises_job_incomplete_error(sample_template): check_job_status() assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) -def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(sample_template): + +def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is_not_complete(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), @@ -793,8 +803,15 @@ def test_check_job_status_task_raises_job_incomplete_error_when_scheduled_job_is check_job_status() assert e.value.message == "Job(s) ['{}'] have not completed.".format(str(job.id)) + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id)],), + queue=QueueNames.JOBS + ) -def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sample_template): + +def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(mocker, sample_template): + mock_celery = mocker.patch('app.celery.tasks.notify_celery.send_task') job = create_job(template=sample_template, notification_count=3, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), @@ -810,6 +827,12 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(sam assert str(job.id) in e.value.message assert str(job_2.id) in e.value.message + mock_celery.assert_called_once_with( + name=TaskNames.PROCESS_INCOMPLETE_JOBS, + args=([str(job.id), str(job_2.id)],), + queue=QueueNames.JOBS + ) + def test_check_job_status_task_does_not_raise_error(sample_template): job = create_job(template=sample_template, notification_count=3, @@ -821,4 +844,149 @@ def test_check_job_status_task_does_not_raise_error(sample_template): created_at=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_FINISHED) + check_job_status() + + +def test_process_incomplete_job(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 2 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 8 # There are 10 in the file and we've added two already + + +def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + create_notification(sample_template, job, 3) + create_notification(sample_template, job, 4) + create_notification(sample_template, job, 5) + create_notification(sample_template, job, 6) + create_notification(sample_template, job, 7) + create_notification(sample_template, job, 8) + create_notification(sample_template, job, 9) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 10 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 0 # There are 10 in the file and we've added two already + + +def test_process_incomplete_jobs(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 3 + + job2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job2, 0) + create_notification(sample_template, job2, 1) + create_notification(sample_template, job2, 2) + create_notification(sample_template, job2, 3) + create_notification(sample_template, job2, 4) + + assert Notification.query.filter(Notification.job_id == job2.id).count() == 5 + + jobs = [job.id, job2.id] + process_incomplete_jobs(jobs) + + completed_job = Job.query.filter(Job.id == job.id).one() + completed_job2 = Job.query.filter(Job.id == job2.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert completed_job2.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 12 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 0 + + process_incomplete_job(job.id) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 10 # There are 10 in the csv file + + +def test_process_incomplete_jobs(mocker): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + jobs = [] + process_incomplete_jobs(jobs) + + assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + with pytest.raises(expected_exception=Exception) as e: + process_incomplete_job(fake_uuid) + + assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already