diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 5c7bb441d..e9667e755 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -5,15 +5,12 @@ from datetime import ( from celery.signals import worker_process_shutdown from flask import current_app -from notifications_utils.recipients import RecipientCSV -from sqlalchemy import or_, and_ +from sqlalchemy import and_ from sqlalchemy.exc import SQLAlchemyError from notifications_utils.s3 import s3upload from app.aws import s3 from app import notify_celery -from app.celery import celery -from app.dao.templates_dao import dao_get_template_by_id from app.performance_platform import total_sent_notifications, processing_time from app import performance_platform_client from app.dao.date_util import get_month_start_and_end_date_in_utc @@ -22,8 +19,8 @@ from app.dao.invited_user_dao import delete_invitations_created_more_than_two_da from app.dao.jobs_dao import ( dao_get_letter_job_ids_by_status, dao_set_scheduled_jobs_to_pending, - dao_get_jobs_older_than_limited_by, - dao_get_job_by_id) + dao_get_jobs_older_than_limited_by +) from app.dao.monthly_billing_dao import ( get_service_ids_that_need_billing_populated, create_or_update_monthly_billing @@ -44,7 +41,6 @@ from app.dao.provider_details_dao import ( from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.models import ( Job, - Notification, LETTER_TYPE, JOB_STATUS_READY_TO_SEND, JOB_STATUS_IN_PROGRESS @@ -53,10 +49,7 @@ from app.notifications.process_notifications import send_notification_to_queue from app.statsd_decorators import statsd from app.celery.tasks import ( create_dvla_file_contents_for_notifications, - get_template_class, - process_job, - process_row, - job_complete + process_job ) from app.config import QueueNames, TaskNames from app.utils import convert_utc_to_bst @@ -409,44 +402,3 @@ def check_job_status(): queue=QueueNames.JOBS ) raise JobIncompleteError("Job(s) {} have not completed.".format(job_ids)) - - -@notify_celery.task(name='process-incomplete-jobs') -@statsd(namespace="tasks") -def process_incomplete_jobs(job_ids): - current_app.logger.info("Resuming Job(s) {}".format(job_ids)) - for job_id in job_ids: - process_incomplete_job(job_id) - - -def process_incomplete_job(job_id): - - job = Job.query.filter(Job.id == job_id).one() - - last_notification_added = Notification.query.filter( - Notification.job_id == job_id - ).order_by( - Notification.job_row_number.desc() - ).first() - - if last_notification_added: - resume_from_row = last_notification_added.job_row_number - else: - resume_from_row = -1 # The first row in the csv with a number is row 0 - - current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) - - db_template = dao_get_template_by_id(job.template_id, job.template_version) - - TemplateClass = get_template_class(db_template.template_type) - template = TemplateClass(db_template.__dict__) - - for row_number, recipient, personalisation in RecipientCSV( - s3.get_job_from_s3(str(job.service_id), str(job.id)), - template_type=template.template_type, - placeholders=template.placeholders - ).enumerated_recipients_and_personalisation: - if row_number > resume_from_row: - process_row(row_number, recipient, personalisation, template, job, job.service) - - job_complete(job, job.service, template, True) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 0f38d2379..b879f806a 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -2,7 +2,7 @@ import json from datetime import datetime from collections import namedtuple -from celery.signals import worker_process_init, worker_process_shutdown +from celery.signals import worker_process_shutdown from flask import current_app from notifications_utils.recipients import ( RecipientCSV @@ -44,9 +44,9 @@ from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count from app.dao.templates_dao import dao_get_template_by_id from app.models import ( + Job, + Notification, EMAIL_TYPE, - SMS_TYPE, - LETTER_TYPE, KEY_TYPE_NORMAL, JOB_STATUS_CANCELLED, JOB_STATUS_PENDING, @@ -54,8 +54,10 @@ from app.models import ( JOB_STATUS_FINISHED, JOB_STATUS_READY_TO_SEND, JOB_STATUS_SENT_TO_DVLA, JOB_STATUS_ERROR, + LETTER_TYPE, NOTIFICATION_SENDING, - NOTIFICATION_TECHNICAL_FAILURE + NOTIFICATION_TECHNICAL_FAILURE, + SMS_TYPE, ) from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to @@ -492,3 +494,44 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): service_id, inbound_api.url, e)) except self.MaxRetriesExceededError: current_app.logger.exception('Retry: send_inbound_sms_to_service has retried the max number of times') + + +@notify_celery.task(name='process-incomplete-jobs') +@statsd(namespace="tasks") +def process_incomplete_jobs(job_ids): + current_app.logger.info("Resuming Job(s) {}".format(job_ids)) + for job_id in job_ids: + process_incomplete_job(job_id) + + +def process_incomplete_job(job_id): + + job = Job.query.filter(Job.id == job_id).one() + + last_notification_added = Notification.query.filter( + Notification.job_id == job_id + ).order_by( + Notification.job_row_number.desc() + ).first() + + if last_notification_added: + resume_from_row = last_notification_added.job_row_number + else: + resume_from_row = -1 # The first row in the csv with a number is row 0 + + current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row)) + + db_template = dao_get_template_by_id(job.template_id, job.template_version) + + TemplateClass = get_template_class(db_template.template_type) + template = TemplateClass(db_template.__dict__) + + for row_number, recipient, personalisation in RecipientCSV( + s3.get_job_from_s3(str(job.service_id), str(job.id)), + template_type=template.template_type, + placeholders=template.placeholders + ).enumerated_recipients_and_personalisation: + if row_number > resume_from_row: + process_row(row_number, recipient, personalisation, template, job, job.service) + + job_complete(job, job.service, template, True) diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index c7f678b8a..ba87a94de 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -9,6 +9,7 @@ from freezegun import freeze_time from app.celery import scheduled_tasks from app.celery.scheduled_tasks import ( + check_job_status, delete_dvla_response_files_older_than_seven_days, delete_email_notifications_older_than_seven_days, delete_inbound_sms_older_than_seven_days, @@ -22,15 +23,15 @@ from app.celery.scheduled_tasks import ( run_scheduled_jobs, run_letter_jobs, run_letter_api_notifications, + populate_monthly_billing, s3, send_daily_performance_platform_stats, send_scheduled_notifications, + send_total_sent_notifications_to_performance_platform, switch_current_sms_provider_on_slow_delivery, timeout_job_statistics, - timeout_notifications, - populate_monthly_billing, - send_total_sent_notifications_to_performance_platform, check_job_status, process_incomplete_job, - process_incomplete_jobs) + timeout_notifications +) from app.clients.performance_platform.performance_platform_client import PerformancePlatformClient from app.config import QueueNames, TaskNames from app.dao.jobs_dao import dao_get_job_by_id @@ -48,10 +49,10 @@ from app.models import ( NOTIFICATION_PENDING, NOTIFICATION_CREATED, KEY_TYPE_TEST, - MonthlyBilling, JOB_STATUS_FINISHED, Job, Notification) + MonthlyBilling +) from app.utils import get_london_midnight_in_utc from app.v2.errors import JobIncompleteError -from tests.app import load_example_csv from tests.app.db import create_notification, create_service, create_template, create_job, create_rate from tests.app.conftest import ( sample_job as create_sample_job, @@ -830,164 +831,6 @@ def test_check_job_status_task_raises_job_incomplete_error_for_multiple_jobs(moc mock_celery.assert_called_once_with( name=TaskNames.PROCESS_INCOMPLETE_JOBS, - args=([str(job.id), str(job_2.id)],), + args=([str(job_2.id), str(job.id)],), queue=QueueNames.JOBS ) - - -def test_check_job_status_task_does_not_raise_error(sample_template): - job = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_FINISHED) - job_2 = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_FINISHED) - - check_job_status() - - -def test_process_incomplete_job(mocker, sample_template): - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - job = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - create_notification(sample_template, job, 0) - create_notification(sample_template, job, 1) - - assert Notification.query.filter(Notification.job_id == job.id).count() == 2 - - process_incomplete_job(str(job.id)) - - completed_job = Job.query.filter(Job.id == job.id).one() - - assert completed_job.job_status == JOB_STATUS_FINISHED - - assert send_sms.call_count == 8 # There are 10 in the file and we've added two already - - -def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - job = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - create_notification(sample_template, job, 0) - create_notification(sample_template, job, 1) - create_notification(sample_template, job, 2) - create_notification(sample_template, job, 3) - create_notification(sample_template, job, 4) - create_notification(sample_template, job, 5) - create_notification(sample_template, job, 6) - create_notification(sample_template, job, 7) - create_notification(sample_template, job, 8) - create_notification(sample_template, job, 9) - - assert Notification.query.filter(Notification.job_id == job.id).count() == 10 - - process_incomplete_job(str(job.id)) - - completed_job = Job.query.filter(Job.id == job.id).one() - - assert completed_job.job_status == JOB_STATUS_FINISHED - - assert send_sms.call_count == 0 # There are 10 in the file and we've added two already - - -def test_process_incomplete_jobs(mocker, sample_template): - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - job = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - create_notification(sample_template, job, 0) - create_notification(sample_template, job, 1) - create_notification(sample_template, job, 2) - - assert Notification.query.filter(Notification.job_id == job.id).count() == 3 - - job2 = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - create_notification(sample_template, job2, 0) - create_notification(sample_template, job2, 1) - create_notification(sample_template, job2, 2) - create_notification(sample_template, job2, 3) - create_notification(sample_template, job2, 4) - - assert Notification.query.filter(Notification.job_id == job2.id).count() == 5 - - jobs = [job.id, job2.id] - process_incomplete_jobs(jobs) - - completed_job = Job.query.filter(Job.id == job.id).one() - completed_job2 = Job.query.filter(Job.id == job2.id).one() - - assert completed_job.job_status == JOB_STATUS_FINISHED - - assert completed_job2.job_status == JOB_STATUS_FINISHED - - assert send_sms.call_count == 12 # There are 20 in total over 2 jobs we've added 8 already - - -def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - job = create_job(template=sample_template, notification_count=3, - created_at=datetime.utcnow() - timedelta(hours=2), - scheduled_for=datetime.utcnow() - timedelta(minutes=31), - processing_started=datetime.utcnow() - timedelta(minutes=31), - job_status=JOB_STATUS_IN_PROGRESS) - - assert Notification.query.filter(Notification.job_id == job.id).count() == 0 - - process_incomplete_job(job.id) - - completed_job = Job.query.filter(Job.id == job.id).one() - - assert completed_job.job_status == JOB_STATUS_FINISHED - - assert send_sms.call_count == 10 # There are 10 in the csv file - - -def test_process_incomplete_jobs(mocker): - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - jobs = [] - process_incomplete_jobs(jobs) - - assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already - - -def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): - - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') - - with pytest.raises(expected_exception=Exception) as e: - process_incomplete_job(fake_uuid) - - assert send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 980622680..a25851d1b 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,6 +1,6 @@ import json import uuid -from datetime import datetime +from datetime import datetime, timedelta from unittest.mock import Mock import pytest import requests_mock @@ -13,8 +13,8 @@ from celery.exceptions import Retry from app import (encryption, DATETIME_FORMAT) from app.celery import provider_tasks from app.celery import tasks +from app.celery.scheduled_tasks import check_job_status from app.celery.tasks import ( - s3, build_dvla_file, create_dvla_file_contents_for_job, process_job, @@ -22,20 +22,26 @@ from app.celery.tasks import ( send_sms, send_email, persist_letter, + process_incomplete_job, + process_incomplete_jobs, get_template_class, - send_inbound_sms_to_service) + s3, + send_inbound_sms_to_service +) from app.config import QueueNames from app.dao import jobs_dao, services_dao from app.models import ( + Job, + Notification, EMAIL_TYPE, + JOB_STATUS_FINISHED, + JOB_STATUS_IN_PROGRESS, KEY_TYPE_NORMAL, KEY_TYPE_TEAM, KEY_TYPE_TEST, LETTER_TYPE, SERVICE_PERMISSION_TYPES, - SMS_TYPE, - Job, - Notification + SMS_TYPE ) from tests.app import load_example_csv @@ -1204,3 +1210,161 @@ def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(not send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) mocked.call_count == 0 + + +def test_check_job_status_task_does_not_raise_error(sample_template): + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + job_2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_FINISHED) + + check_job_status() + + +def test_process_incomplete_job(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 2 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert send_sms.call_count == 8 # There are 10 in the file and we've added two already + + +def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mock_send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + create_notification(sample_template, job, 3) + create_notification(sample_template, job, 4) + create_notification(sample_template, job, 5) + create_notification(sample_template, job, 6) + create_notification(sample_template, job, 7) + create_notification(sample_template, job, 8) + create_notification(sample_template, job, 9) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 10 + + process_incomplete_job(str(job.id)) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert mock_send_sms.call_count == 0 # There are 10 in the file and we've added two already + + +def test_process_incomplete_jobs(mocker, sample_template): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mock_send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + create_notification(sample_template, job, 0) + create_notification(sample_template, job, 1) + create_notification(sample_template, job, 2) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 3 + + job2 = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + create_notification(sample_template, job2, 0) + create_notification(sample_template, job2, 1) + create_notification(sample_template, job2, 2) + create_notification(sample_template, job2, 3) + create_notification(sample_template, job2, 4) + + assert Notification.query.filter(Notification.job_id == job2.id).count() == 5 + + jobs = [job.id, job2.id] + process_incomplete_jobs(jobs) + + completed_job = Job.query.filter(Job.id == job.id).one() + completed_job2 = Job.query.filter(Job.id == job2.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert completed_job2.job_status == JOB_STATUS_FINISHED + + assert mock_send_sms.call_count == 12 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mock_send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + job = create_job(template=sample_template, notification_count=3, + created_at=datetime.utcnow() - timedelta(hours=2), + scheduled_for=datetime.utcnow() - timedelta(minutes=31), + processing_started=datetime.utcnow() - timedelta(minutes=31), + job_status=JOB_STATUS_IN_PROGRESS) + + assert Notification.query.filter(Notification.job_id == job.id).count() == 0 + + process_incomplete_job(job.id) + + completed_job = Job.query.filter(Job.id == job.id).one() + + assert completed_job.job_status == JOB_STATUS_FINISHED + + assert mock_send_sms.call_count == 10 # There are 10 in the csv file + + +def test_process_incomplete_jobs(mocker): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mock_send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + jobs = [] + process_incomplete_jobs(jobs) + + assert mock_send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already + + +def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mock_send_sms = mocker.patch('app.celery.tasks.send_sms.apply_async') + + with pytest.raises(expected_exception=Exception) as e: + process_incomplete_job(fake_uuid) + + assert mock_send_sms.call_count == 0 # There are 20 in total over 2 jobs we've added 8 already