diff --git a/app/celery/tasks.py b/app/celery/tasks.py index d84424333..10852929d 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -24,6 +24,7 @@ from app.dao.templates_dao import dao_get_template_by_id from app.models import ( EMAIL_TYPE, SMS_TYPE, + LETTER_TYPE, KEY_TYPE_NORMAL ) from app.notifications.process_notifications import persist_notification @@ -58,33 +59,7 @@ def process_job(job_id): template_type=template.template_type, placeholders=template.placeholders ).enumerated_recipients_and_personalisation: - - encrypted = encryption.encrypt({ - 'template': str(template.id), - 'template_version': job.template_version, - 'job': str(job.id), - 'to': recipient, - 'row_number': row_number, - 'personalisation': dict(personalisation) - }) - - if template.template_type == SMS_TYPE: - send_sms.apply_async(( - str(job.service_id), - create_uuid(), - encrypted, - datetime.utcnow().strftime(DATETIME_FORMAT)), - queue='db-sms' if not service.research_mode else 'research-mode' - ) - - if template.template_type == EMAIL_TYPE: - send_email.apply_async(( - str(job.service_id), - create_uuid(), - encrypted, - datetime.utcnow().strftime(DATETIME_FORMAT)), - queue='db-email' if not service.research_mode else 'research-mode' - ) + process_row(row_number, recipient, personalisation, template, job, service) finished = datetime.utcnow() job.job_status = 'finished' @@ -96,6 +71,39 @@ def process_job(job_id): ) +def process_row(row_number, recipient, personalisation, template, job, service): + template_type = template.template_type + + encrypted = encryption.encrypt({ + 'template': str(template.id), + 'template_version': job.template_version, + 'job': str(job.id), + 'to': recipient, + 'row_number': row_number, + 'personalisation': dict(personalisation) + }) + + send_fns = { + SMS_TYPE: send_sms, + EMAIL_TYPE: send_email, + } + + queues = { + SMS_TYPE: 'db-sms', + EMAIL_TYPE: 'db-email', + } + + send_fn = send_fns[template_type] + + send_fn.apply_async(( + str(job.service_id), + create_uuid(), + encrypted, + datetime.utcnow().strftime(DATETIME_FORMAT)), + queue=queues[template_type] if not service.research_mode else 'research-mode' + ) + + def __sending_limits_for_job_exceeded(service, job, job_id): total_sent = fetch_todays_total_message_count(service.id) @@ -177,7 +185,8 @@ def send_sms(self, @notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") -def send_email(self, service_id, +def send_email(self, + service_id, notification_id, encrypted_notification, created_at, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 4315d24f6..2dd70329d 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -55,6 +55,14 @@ def test_should_have_decorated_tasks_functions(): assert send_email.__wrapped__.__name__ == 'send_email' +@pytest.fixture +def email_job_with_placeholders(notify_db, notify_db_session, sample_email_template_with_placeholders): + return sample_job(notify_db, notify_db_session, template=sample_email_template_with_placeholders) + + +# -------------- process_job tests -------------- # + + @freeze_time("2016-01-01 11:09:00.061258") def test_should_process_sms_job(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) @@ -83,61 +91,6 @@ def test_should_process_sms_job(sample_job, mocker): assert job.job_status == 'finished' -@freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_sms_job_into_research_mode_queue_if_research_mode_service(notify_db, notify_db_session, mocker): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) - mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") - - service = sample_service(notify_db, notify_db_session) - service.research_mode = True - services_dao.dao_update_service(service) - job = sample_job(notify_db, notify_db_session, service=service) - - process_job(job.id) - s3.get_job_from_s3.assert_called_once_with( - str(job.service.id), - str(job.id) - ) - tasks.send_sms.apply_async.assert_called_once_with( - (str(job.service_id), - "uuid", - "something_encrypted", - "2016-01-01T11:09:00.061258Z"), - queue="research-mode" - ) - - -@freeze_time("2016-01-01 11:09:00.061258") -def test_should_process_email_job_into_research_mode_queue_if_research_mode_service( - notify_db, notify_db_session, mocker -): - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) - mocker.patch('app.celery.tasks.send_email.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") - - service = sample_service(notify_db, notify_db_session) - service.research_mode = True - services_dao.dao_update_service(service) - template = sample_email_template(notify_db, notify_db_session, service=service) - job = sample_job(notify_db, notify_db_session, template=template, service=service) - - process_job(job.id) - s3.get_job_from_s3.assert_called_once_with( - str(job.service.id), - str(job.id) - ) - tasks.send_email.apply_async.assert_called_once_with( - (str(job.service_id), - "uuid", - "something_encrypted", - "2016-01-01T11:09:00.061258Z"), - queue="research-mode" - ) - - @freeze_time("2016-01-01 11:09:00.061258") def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, notify_db_session, @@ -146,16 +99,14 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits(notify_db, job = sample_job(notify_db, notify_db_session, service=service, notification_count=10) mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('multiple_sms')) - mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' assert s3.get_job_from_s3.called is False - assert tasks.send_sms.apply_async.called is False + assert tasks.process_row.called is False def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify_db, @@ -167,16 +118,14 @@ def test_should_not_process_sms_job_if_would_exceed_send_limits_inc_today(notify sample_notification(notify_db, notify_db_session, service=service, job=job) mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('sms')) - mocker.patch('app.celery.tasks.send_sms.apply_async') - mocker.patch('app.encryption.encrypt', return_value="something_encrypted") - mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") + mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' assert s3.get_job_from_s3.called is False - assert tasks.send_sms.apply_async.called is False + assert tasks.process_row.called is False def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(notify_db, notify_db_session, mocker): @@ -186,15 +135,15 @@ def test_should_not_process_email_job_if_would_exceed_send_limits_inc_today(noti sample_notification(notify_db, notify_db_session, service=service, job=job) - mocker.patch('app.celery.tasks.s3.get_job_from_s3', return_value=load_example_csv('email')) - mocker.patch('app.celery.tasks.send_email.apply_async') + mocker.patch('app.celery.tasks.s3.get_job_from_s3') + mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' assert s3.get_job_from_s3.called is False - assert tasks.send_email.apply_async.called is False + assert tasks.process_row.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -204,26 +153,26 @@ def test_should_not_process_email_job_if_would_exceed_send_limits(notify_db, not job = sample_job(notify_db, notify_db_session, service=service, template=template) mocker.patch('app.celery.tasks.s3.get_job_from_s3') - mocker.patch('app.celery.tasks.send_email.apply_async') + mocker.patch('app.celery.tasks.process_row') process_job(job.id) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'sending limits exceeded' assert s3.get_job_from_s3.called is False - assert tasks.send_email.apply_async.called is False + assert tasks.process_row.called is False def test_should_not_process_job_if_already_pending(notify_db, notify_db_session, mocker): job = sample_job(notify_db, notify_db_session, job_status='scheduled') mocker.patch('app.celery.tasks.s3.get_job_from_s3') - mocker.patch('app.celery.tasks.send_sms.apply_async') + mocker.patch('app.celery.tasks.process_row') process_job(job.id) assert s3.get_job_from_s3.called is False - assert tasks.send_sms.apply_async.called is False + assert tasks.process_row.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -270,11 +219,7 @@ def test_should_not_create_send_task_for_empty_file(sample_job, mocker): ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'finished' - - -@pytest.fixture -def email_job_with_placeholders(notify_db, notify_db_session, sample_email_template_with_placeholders): - return sample_job(notify_db, notify_db_session, template=sample_email_template_with_placeholders) + assert tasks.send_sms.apply_async.called is False @freeze_time("2016-01-01 11:09:00.061258") @@ -329,11 +274,16 @@ def test_should_process_all_sms_job(sample_job, assert encryption.encrypt.call_args[0][0][ 'template_version'] == sample_job_with_placeholdered_template.template.version # noqa assert encryption.encrypt.call_args[0][0]['personalisation'] == {'phonenumber': '+441234123120', 'name': 'chris'} - tasks.send_sms.apply_async.call_count == 10 + assert tasks.send_sms.apply_async.call_count == 10 job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id) assert job.job_status == 'finished' +# -------------- process_row tests -------------- # + +# -------- send_sms and send_email tests -------- # + + def test_should_send_template_to_correct_sms_task_and_persist(sample_template_with_placeholders, mocker): notification = _notification_json(sample_template_with_placeholders, to="+447234123123", personalisation={"name": "Jo"}) @@ -639,28 +589,27 @@ def test_should_not_send_sms_if_team_key_and_recipient_not_in_team(notify_db, no def test_should_use_email_template_and_persist(sample_email_template_with_placeholders, sample_api_key, mocker): + mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') + + now = datetime(2016, 1, 1, 11, 9, 0) + notification_id = uuid.uuid4() + with freeze_time("2016-01-01 12:00:00.000000"): notification = _notification_json( sample_email_template_with_placeholders, 'my_email@my_email.com', {"name": "Jo"}, row_number=1) - mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') - notification_id = uuid.uuid4() - - with freeze_time("2016-01-01 11:09:00.00000"): - now = datetime.utcnow() - - with freeze_time("2016-01-01 11:10:00.00000"): - send_email( - sample_email_template_with_placeholders.service_id, - notification_id, - encryption.encrypt(notification), - now.strftime(DATETIME_FORMAT), - api_key_id=str(sample_api_key.id), - key_type=sample_api_key.key_type - ) + with freeze_time("2016-01-01 11:10:00.00000"): + send_email( + sample_email_template_with_placeholders.service_id, + notification_id, + encryption.encrypt(notification), + now.strftime(DATETIME_FORMAT), + api_key_id=str(sample_api_key.id), + key_type=sample_api_key.key_type + ) persisted_notification = Notification.query.one() assert persisted_notification.to == 'my_email@my_email.com'