From 14d621d243dbf8e934d407dab117565dd837edab Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Wed, 9 Mar 2016 11:28:52 +0000 Subject: [PATCH] Job processing respects sendlimits - If a job starts it MUST be able to fit into the days sending limits - So if service limit is 10, and we've sent 5 messages and the current job is 4 then it's OK. - If the job is 6 then it's over the limit and it should fail - Job should NOT start if can't complete in the limit --- app/celery/tasks.py | 35 +++++++++++++++++++++++++---- tests/app/celery/test_tasks.py | 41 +++++++++++++++++++++++++++++++++- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index c4cd157c8..ccc2248ad 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,10 +1,14 @@ -from app import create_uuid, DATETIME_FORMAT +from app import create_uuid, DATETIME_FORMAT, DATE_FORMAT from app import notify_celery, encryption, firetext_client, aws_ses_client from app.clients.email.aws_ses import AwsSesClientException from app.clients.sms.firetext import FiretextClientException from app.dao.services_dao import dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id -from app.dao.notifications_dao import dao_create_notification, dao_update_notification +from app.dao.notifications_dao import ( + dao_create_notification, + dao_update_notification, + dao_get_notification_statistics_for_service_and_day +) from app.dao.jobs_dao import dao_update_job, dao_get_job_by_id from app.models import Notification, TEMPLATE_TYPE_EMAIL, TEMPLATE_TYPE_SMS from flask import current_app @@ -19,12 +23,35 @@ from utils.recipients import RecipientCSV, first_column_heading def process_job(job_id): start = datetime.utcnow() job = dao_get_job_by_id(job_id) + + service = job.service + + stats = dao_get_notification_statistics_for_service_and_day( + service_id=service.id, + day=job.created_at.strftime(DATE_FORMAT) + ) + + if stats: + sending_limit = service.limit + job_size = job.notification_count + total_sent = stats.emails_requested + stats.sms_requested + + if total_sent + job_size >= sending_limit: + finished = datetime.utcnow() + job.status = 'finished' + job.processing_finished = finished + dao_update_job(job) + current_app.logger.info( + "Job {} size {} error. Sending limits {} exceeded".format(job_id, job.notification_count, service.limit) + ) + return + job.status = 'in progress' dao_update_job(job) for recipient, personalisation in RecipientCSV( - s3.get_job_from_s3(job.bucket_name, job_id), - template_type=job.template.template_type + s3.get_job_from_s3(job.bucket_name, job_id), + template_type=job.template.template_type ).recipients_and_personalisation: encrypted = encryption.encrypt({ diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 830c39e0b..7aec58964 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -16,7 +16,9 @@ from freezegun import freeze_time from tests.app.conftest import ( sample_service, sample_user, - sample_template + sample_template, + sample_job, + sample_email_template ) @@ -41,6 +43,43 @@ def test_should_process_sms_job(sample_job, mocker): assert job.status == 'finished' +@freeze_time("2016-01-01 11:09:00.061258") +def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, mocker): + service = sample_service(notify_db, notify_db_session, limit=9) + job = sample_job(notify_db, notify_db_session, service=service) + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) + mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + + process_job(job.id) + + s3.get_job_from_s3.assert_called_once_with(job.bucket_name, job.id) + job = jobs_dao.dao_get_job_by_id(job.id) + assert job.status == 'finished' + tasks.send_sms.apply_async.assert_not_called + + +@freeze_time("2016-01-01 11:09:00.061258") +def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, mocker): + service = sample_service(notify_db, notify_db_session, limit=9) + template = sample_email_template(notify_db, notify_db_session, service=service) + job = sample_job(notify_db, notify_db_session, service=service, template=template) + + mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_email')) + mocker.patch('app.celery.tasks.send_email.apply_async') + mocker.patch('app.encryption.encrypt', return_value="something_encrypted") + mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + + process_job(job.id) + + s3.get_job_from_s3.assert_called_once_with(job.bucket_name, job.id) + job = jobs_dao.dao_get_job_by_id(job.id) + assert job.status == 'finished' + tasks.send_email.apply_async.assert_not_called + + def test_should_not_create_send_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('empty')) mocker.patch('app.celery.tasks.send_sms.apply_async')