import json import uuid from datetime import datetime, timedelta from unittest.mock import Mock, call import pytest import requests_mock from celery.exceptions import Retry from freezegun import freeze_time from notifications_utils.recipients import Row from notifications_utils.template import ( PlainTextEmailTemplate, SMSMessageTemplate, ) from requests import RequestException from sqlalchemy.exc import SQLAlchemyError from app import encryption from app.celery import provider_tasks, tasks from app.celery.tasks import ( get_recipient_csv_and_template_and_sender_id, process_incomplete_job, process_incomplete_jobs, process_job, process_row, s3, save_api_email, save_api_sms, save_email, save_sms, send_inbound_sms_to_service, ) from app.config import QueueNames from app.dao import jobs_dao, service_email_reply_to_dao, service_sms_sender_dao from app.models import ( EMAIL_TYPE, JOB_STATUS_ERROR, JOB_STATUS_FINISHED, JOB_STATUS_IN_PROGRESS, KEY_TYPE_NORMAL, NOTIFICATION_CREATED, SMS_TYPE, Job, Notification, ) from app.serialised_models import SerialisedService, SerialisedTemplate from app.utils import DATETIME_FORMAT from tests.app import load_example_csv from tests.app.db import ( create_api_key, create_inbound_sms, create_job, create_notification, create_reply_to_email, create_service, create_service_inbound_api, create_service_with_defined_sms_sender, create_template, create_user, ) class AnyStringWith(str): def __eq__(self, other): return self in other def _notification_json(template, to, personalisation=None, job_id=None, row_number=0): return { "template": str(template.id), "template_version": template.version, "to": to, "notification_type": template.template_type, "personalisation": personalisation or {}, "job": job_id and str(job_id), "row_number": row_number } def test_should_have_decorated_tasks_functions(): assert process_job.__wrapped__.__name__ == 'process_job' assert save_sms.__wrapped__.__name__ == 'save_sms' assert save_email.__wrapped__.__name__ == 'save_email' @pytest.fixture def email_job_with_placeholders(notify_db_session, sample_email_template_with_placeholders): return create_job(template=sample_email_template_with_placeholders) # -------------- process_job tests -------------- # def test_should_process_sms_job(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('sms'), {'sender_id': None})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_job.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job.service.id), job_id=str(sample_job.id) ) assert encryption.encrypt.call_args[0][0]['to'] == '+441234123123' assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job.template.id) assert encryption.encrypt.call_args[0][0]['template_version'] == sample_job.template.version assert encryption.encrypt.call_args[0][0]['personalisation'] == {'phonenumber': '+441234123123'} assert encryption.encrypt.call_args[0][0]['row_number'] == 0 tasks.save_sms.apply_async.assert_called_once_with( (str(sample_job.service_id), "uuid", "something_encrypted"), {}, queue="database-tasks" ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'finished' def test_should_process_sms_job_with_sender_id(sample_job, mocker, fake_uuid): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('sms'), {'sender_id': fake_uuid})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_job.id, sender_id=fake_uuid) tasks.save_sms.apply_async.assert_called_once_with( (str(sample_job.service_id), "uuid", "something_encrypted"), {'sender_id': fake_uuid}, queue="database-tasks" ) def test_should_not_process_job_if_already_pending(sample_template, mocker): job = create_job(template=sample_template, job_status='scheduled') mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3') mocker.patch('app.celery.tasks.process_row') process_job(job.id) assert s3.get_job_and_metadata_from_s3.called is False assert tasks.process_row.called is False def test_should_process_job_if_send_limits_are_not_exceeded( notify_api, notify_db_session, mocker ): service = create_service(message_limit=10) template = create_template(service=service, template_type='email') job = create_job(template=template, notification_count=10) mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_email'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(job.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(job.service.id), job_id=str(job.id) ) job = jobs_dao.dao_get_job_by_id(job.id) assert job.job_status == 'finished' tasks.save_email.apply_async.assert_called_with( ( str(job.service_id), "uuid", "something_encrypted", ), {}, queue="database-tasks" ) def test_should_not_create_save_task_for_empty_file(sample_job, mocker): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('empty'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_sms.apply_async') process_job(sample_job.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job.service.id), job_id=str(sample_job.id) ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'finished' assert tasks.save_sms.apply_async.called is False def test_should_process_email_job(email_job_with_placeholders, mocker): email_csv = """email_address,name test@test.com,foo """ mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": None})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(email_job_with_placeholders.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(email_job_with_placeholders.service.id), job_id=str(email_job_with_placeholders.id) ) assert encryption.encrypt.call_args[0][0]['to'] == 'test@test.com' assert encryption.encrypt.call_args[0][0]['template'] == str(email_job_with_placeholders.template.id) assert encryption.encrypt.call_args[0][0]['template_version'] == email_job_with_placeholders.template.version assert encryption.encrypt.call_args[0][0]['personalisation'] == {'emailaddress': 'test@test.com', 'name': 'foo'} tasks.save_email.apply_async.assert_called_once_with( ( str(email_job_with_placeholders.service_id), "uuid", "something_encrypted", ), {}, queue="database-tasks" ) job = jobs_dao.dao_get_job_by_id(email_job_with_placeholders.id) assert job.job_status == 'finished' def test_should_process_email_job_with_sender_id(email_job_with_placeholders, mocker, fake_uuid): email_csv = """email_address,name test@test.com,foo """ mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(email_csv, {"sender_id": fake_uuid})) mocker.patch('app.celery.tasks.save_email.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(email_job_with_placeholders.id, sender_id=fake_uuid) tasks.save_email.apply_async.assert_called_once_with( (str(email_job_with_placeholders.service_id), "uuid", "something_encrypted"), {'sender_id': fake_uuid}, queue="database-tasks" ) def test_should_process_all_sms_job(sample_job_with_placeholdered_template, mocker): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {"sender_id": None})) mocker.patch('app.celery.tasks.save_sms.apply_async') mocker.patch('app.encryption.encrypt', return_value="something_encrypted") mocker.patch('app.celery.tasks.create_uuid', return_value="uuid") process_job(sample_job_with_placeholdered_template.id) s3.get_job_and_metadata_from_s3.assert_called_once_with( service_id=str(sample_job_with_placeholdered_template.service.id), job_id=str(sample_job_with_placeholdered_template.id) ) assert encryption.encrypt.call_args[0][0]['to'] == '+441234123120' assert encryption.encrypt.call_args[0][0]['template'] == str(sample_job_with_placeholdered_template.template.id) assert encryption.encrypt.call_args[0][0][ 'template_version'] == sample_job_with_placeholdered_template.template.version # noqa assert encryption.encrypt.call_args[0][0]['personalisation'] == {'phonenumber': '+441234123120', 'name': 'chris'} assert tasks.save_sms.apply_async.call_count == 10 job = jobs_dao.dao_get_job_by_id(sample_job_with_placeholdered_template.id) assert job.job_status == 'finished' # -------------- process_row tests -------------- # @pytest.mark.parametrize('template_type, research_mode, expected_function, expected_queue', [ (SMS_TYPE, False, 'save_sms', 'database-tasks'), (SMS_TYPE, True, 'save_sms', 'research-mode-tasks'), (EMAIL_TYPE, False, 'save_email', 'database-tasks'), (EMAIL_TYPE, True, 'save_email', 'research-mode-tasks'), ]) def test_process_row_sends_letter_task(template_type, research_mode, expected_function, expected_queue, mocker): mocker.patch('app.celery.tasks.create_uuid', return_value='noti_uuid') task_mock = mocker.patch('app.celery.tasks.{}.apply_async'.format(expected_function)) encrypt_mock = mocker.patch('app.celery.tasks.encryption.encrypt') template = Mock(id='template_id', template_type=template_type) job = Mock(id='job_id', template_version='temp_vers') service = Mock(id='service_id', research_mode=research_mode) process_row( Row( {'foo': 'bar', 'to': 'recip'}, index='row_num', error_fn=lambda k, v: None, recipient_column_headers=['to'], placeholders={'foo'}, template=template, allow_international_letters=True, ), template, job, service, ) encrypt_mock.assert_called_once_with({ 'template': 'template_id', 'template_version': 'temp_vers', 'job': 'job_id', 'to': 'recip', 'row_number': 'row_num', 'personalisation': {'foo': 'bar'} }) task_mock.assert_called_once_with( ( 'service_id', 'noti_uuid', # encrypted data encrypt_mock.return_value, ), {}, queue=expected_queue ) def test_process_row_when_sender_id_is_provided(mocker, fake_uuid): mocker.patch('app.celery.tasks.create_uuid', return_value='noti_uuid') task_mock = mocker.patch('app.celery.tasks.save_sms.apply_async') encrypt_mock = mocker.patch('app.celery.tasks.encryption.encrypt') template = Mock(id='template_id', template_type=SMS_TYPE) job = Mock(id='job_id', template_version='temp_vers') service = Mock(id='service_id', research_mode=False) process_row( Row( {'foo': 'bar', 'to': 'recip'}, index='row_num', error_fn=lambda k, v: None, recipient_column_headers=['to'], placeholders={'foo'}, template=template, allow_international_letters=True, ), template, job, service, sender_id=fake_uuid ) task_mock.assert_called_once_with( ( 'service_id', 'noti_uuid', # encrypted data encrypt_mock.return_value, ), {'sender_id': fake_uuid}, queue='database-tasks' ) # -------- save_sms and save_email tests -------- # def test_should_send_template_to_correct_sms_task_and_persist(sample_template_with_placeholders, mocker): notification = _notification_json(sample_template_with_placeholders, to="+447234123123", personalisation={"name": "Jo"}) mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') save_sms( sample_template_with_placeholders.service_id, uuid.uuid4(), encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == '+447234123123' assert persisted_notification.template_id == sample_template_with_placeholders.id assert persisted_notification.template_version == sample_template_with_placeholders.version assert persisted_notification.status == 'created' assert persisted_notification.created_at <= datetime.utcnow() assert not persisted_notification.sent_at assert not persisted_notification.sent_by assert not persisted_notification.job_id assert persisted_notification.personalisation == {'name': 'Jo'} assert persisted_notification.notification_type == 'sms' mocked_deliver_sms.assert_called_once_with( [str(persisted_notification.id)], queue="send-sms-tasks" ) def test_should_put_save_sms_task_in_research_mode_queue_if_research_mode_service(notify_db_session, mocker): service = create_service(research_mode=True, ) template = create_template(service=service) notification = _notification_json(template, to="+447234123123") mocked_deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( template.service_id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() provider_tasks.deliver_sms.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue="research-mode-tasks" ) assert mocked_deliver_sms.called def test_should_save_sms_if_restricted_service_and_valid_number(notify_db_session, mocker): user = create_user(mobile_number="202-867-5309") service = create_service(user=user, restricted=True) template = create_template(service=service) notification = _notification_json(template, "+12028675309") # The user’s own number, but in a different format mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() encrypt_notification = encryption.encrypt(notification) save_sms( service.id, notification_id, encrypt_notification, ) persisted_notification = Notification.query.one() assert persisted_notification.to == '+12028675309' assert persisted_notification.template_id == template.id assert persisted_notification.template_version == template.version assert persisted_notification.status == 'created' assert persisted_notification.created_at <= datetime.utcnow() assert not persisted_notification.sent_at assert not persisted_notification.sent_by assert not persisted_notification.job_id assert not persisted_notification.personalisation assert persisted_notification.notification_type == 'sms' provider_tasks.deliver_sms.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue="send-sms-tasks" ) def test_save_email_should_save_default_email_reply_to_text_on_notification(notify_db_session, mocker): service = create_service() create_reply_to_email(service=service, email_address='reply_to@digital.fake.gov', is_default=True) template = create_template(service=service, template_type='email', subject='Hello') notification = _notification_json(template, to="test@example.com") mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') notification_id = uuid.uuid4() save_email( service.id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == 'reply_to@digital.fake.gov' def test_save_sms_should_save_default_sms_sender_notification_reply_to_text_on(notify_db_session, mocker): service = create_service_with_defined_sms_sender(sms_sender_value='12345') template = create_template(service=service) notification = _notification_json(template, to="2028675309") mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( service.id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == '12345' def test_should_not_save_sms_if_restricted_service_and_invalid_number(notify_db_session, mocker): user = create_user(mobile_number="2028675309") service = create_service(user=user, restricted=True) template = create_template(service=service) notification = _notification_json(template, "2028675400") mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( service.id, notification_id, encryption.encrypt(notification), ) assert provider_tasks.deliver_sms.apply_async.called is False assert Notification.query.count() == 0 def test_should_not_save_email_if_restricted_service_and_invalid_email_address(notify_db_session, mocker): user = create_user() service = create_service(user=user, restricted=True) template = create_template(service=service, template_type='email', subject='Hello') notification = _notification_json(template, to="test@example.com") notification_id = uuid.uuid4() save_email( service.id, notification_id, encryption.encrypt(notification), ) assert Notification.query.count() == 0 def test_should_put_save_email_task_in_research_mode_queue_if_research_mode_service( notify_db_session, mocker ): service = create_service(research_mode=True) template = create_template(service=service, template_type='email') notification = _notification_json(template, to="test@test.com") mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') notification_id = uuid.uuid4() save_email( template.service_id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() provider_tasks.deliver_email.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue="research-mode-tasks" ) def test_should_save_sms_template_to_and_persist_with_job_id(sample_job, mocker): notification = _notification_json( sample_job.template, to="+447234123123", job_id=sample_job.id, row_number=2) mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() now = datetime.utcnow() save_sms( sample_job.service.id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == '+447234123123' assert persisted_notification.job_id == sample_job.id assert persisted_notification.template_id == sample_job.template.id assert persisted_notification.status == 'created' assert not persisted_notification.sent_at assert persisted_notification.created_at >= now assert not persisted_notification.sent_by assert persisted_notification.job_row_number == 2 assert persisted_notification.api_key_id is None assert persisted_notification.key_type == KEY_TYPE_NORMAL assert persisted_notification.notification_type == 'sms' provider_tasks.deliver_sms.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue="send-sms-tasks" ) def test_should_not_save_sms_if_team_key_and_recipient_not_in_team(notify_db_session, mocker): assert Notification.query.count() == 0 user = create_user(mobile_number="2028675309") service = create_service(user=user, restricted=True) template = create_template(service=service) team_members = [user.mobile_number for user in service.users] assert "07890 300000" not in team_members notification = _notification_json(template, "2028675400") mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( service.id, notification_id, encryption.encrypt(notification), ) assert provider_tasks.deliver_sms.apply_async.called is False assert Notification.query.count() == 0 def test_should_use_email_template_and_persist(sample_email_template_with_placeholders, sample_api_key, mocker): mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') now = datetime(2016, 1, 1, 11, 9, 0) notification_id = uuid.uuid4() with freeze_time("2016-01-01 12:00:00.000000"): notification = _notification_json( sample_email_template_with_placeholders, 'my_email@my_email.com', {"name": "Jo"}, row_number=1) with freeze_time("2016-01-01 11:10:00.00000"): save_email( sample_email_template_with_placeholders.service_id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == 'my_email@my_email.com' assert persisted_notification.template_id == sample_email_template_with_placeholders.id assert persisted_notification.template_version == sample_email_template_with_placeholders.version assert persisted_notification.created_at >= now assert not persisted_notification.sent_at assert persisted_notification.status == 'created' assert not persisted_notification.sent_by assert persisted_notification.job_row_number == 1 assert persisted_notification.personalisation == {'name': 'Jo'} assert persisted_notification.api_key_id is None assert persisted_notification.key_type == KEY_TYPE_NORMAL assert persisted_notification.notification_type == 'email' provider_tasks.deliver_email.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue='send-email-tasks') def test_save_email_should_use_template_version_from_job_not_latest(sample_email_template, mocker): notification = _notification_json(sample_email_template, 'my_email@my_email.com') version_on_notification = sample_email_template.version # Change the template from app.dao.templates_dao import ( dao_get_template_by_id, dao_update_template, ) sample_email_template.content = sample_email_template.content + " another version of the template" mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') dao_update_template(sample_email_template) t = dao_get_template_by_id(sample_email_template.id) assert t.version > version_on_notification now = datetime.utcnow() save_email( sample_email_template.service_id, uuid.uuid4(), encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == 'my_email@my_email.com' assert persisted_notification.template_id == sample_email_template.id assert persisted_notification.template_version == version_on_notification assert persisted_notification.created_at >= now assert not persisted_notification.sent_at assert persisted_notification.status == 'created' assert not persisted_notification.sent_by assert persisted_notification.notification_type == 'email' provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue='send-email-tasks') def test_should_use_email_template_subject_placeholders(sample_email_template_with_placeholders, mocker): notification = _notification_json(sample_email_template_with_placeholders, "my_email@my_email.com", {"name": "Jo"}) mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') notification_id = uuid.uuid4() now = datetime.utcnow() save_email( sample_email_template_with_placeholders.service_id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == 'my_email@my_email.com' assert persisted_notification.template_id == sample_email_template_with_placeholders.id assert persisted_notification.status == 'created' assert persisted_notification.created_at >= now assert not persisted_notification.sent_by assert persisted_notification.personalisation == {"name": "Jo"} assert not persisted_notification.reference assert persisted_notification.notification_type == 'email' provider_tasks.deliver_email.apply_async.assert_called_once_with( [str(persisted_notification.id)], queue='send-email-tasks' ) def test_save_email_uses_the_reply_to_text_when_provided(sample_email_template, mocker): notification = _notification_json(sample_email_template, "my_email@my_email.com") mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') service = sample_email_template.service notification_id = uuid.uuid4() service_email_reply_to_dao.add_reply_to_email_address_for_service(service.id, 'default@example.com', True) other_email_reply_to = service_email_reply_to_dao.add_reply_to_email_address_for_service( service.id, 'other@example.com', False) save_email( sample_email_template.service_id, notification_id, encryption.encrypt(notification), sender_id=other_email_reply_to.id, ) persisted_notification = Notification.query.one() assert persisted_notification.notification_type == 'email' assert persisted_notification.reply_to_text == 'other@example.com' def test_save_email_uses_the_default_reply_to_text_if_sender_id_is_none(sample_email_template, mocker): notification = _notification_json(sample_email_template, "my_email@my_email.com") mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') service = sample_email_template.service notification_id = uuid.uuid4() service_email_reply_to_dao.add_reply_to_email_address_for_service(service.id, 'default@example.com', True) save_email( sample_email_template.service_id, notification_id, encryption.encrypt(notification), sender_id=None, ) persisted_notification = Notification.query.one() assert persisted_notification.notification_type == 'email' assert persisted_notification.reply_to_text == 'default@example.com' def test_should_use_email_template_and_persist_without_personalisation(sample_email_template, mocker): notification = _notification_json(sample_email_template, "my_email@my_email.com") mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') notification_id = uuid.uuid4() now = datetime.utcnow() save_email( sample_email_template.service_id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.to == 'my_email@my_email.com' assert persisted_notification.template_id == sample_email_template.id assert persisted_notification.created_at >= now assert not persisted_notification.sent_at assert persisted_notification.status == 'created' assert not persisted_notification.sent_by assert not persisted_notification.personalisation assert not persisted_notification.reference assert persisted_notification.notification_type == 'email' provider_tasks.deliver_email.apply_async.assert_called_once_with([str(persisted_notification.id)], queue='send-email-tasks') def test_save_sms_should_go_to_retry_queue_if_database_errors(sample_template, mocker): notification = _notification_json(sample_template, "+447234123123") expected_exception = SQLAlchemyError() mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') mocker.patch('app.celery.tasks.save_sms.retry', side_effect=Retry) mocker.patch('app.notifications.process_notifications.dao_create_notification', side_effect=expected_exception) notification_id = uuid.uuid4() with pytest.raises(Retry): save_sms( sample_template.service_id, notification_id, encryption.encrypt(notification), ) assert provider_tasks.deliver_sms.apply_async.called is False tasks.save_sms.retry.assert_called_with(exc=expected_exception, queue="retry-tasks") assert Notification.query.count() == 0 def test_save_email_should_go_to_retry_queue_if_database_errors(sample_email_template, mocker): notification = _notification_json(sample_email_template, "test@example.gov.uk") expected_exception = SQLAlchemyError() mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') mocker.patch('app.celery.tasks.save_email.retry', side_effect=Retry) mocker.patch('app.notifications.process_notifications.dao_create_notification', side_effect=expected_exception) notification_id = uuid.uuid4() with pytest.raises(Retry): save_email( sample_email_template.service_id, notification_id, encryption.encrypt(notification), ) assert not provider_tasks.deliver_email.apply_async.called tasks.save_email.retry.assert_called_with(exc=expected_exception, queue="retry-tasks") assert Notification.query.count() == 0 def test_save_email_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) deliver_email = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') retry = mocker.patch('app.celery.tasks.save_email.retry', side_effect=Exception()) notification_id = sample_notification.id save_email( sample_notification.service_id, notification_id, encryption.encrypt(json), ) assert Notification.query.count() == 1 assert not deliver_email.called assert not retry.called def test_save_sms_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') retry = mocker.patch('app.celery.tasks.save_sms.retry', side_effect=Exception()) notification_id = sample_notification.id save_sms( sample_notification.service_id, notification_id, encryption.encrypt(json), ) assert Notification.query.count() == 1 assert not deliver_sms.called assert not retry.called def test_save_sms_uses_sms_sender_reply_to_text(mocker, notify_db_session): service = create_service_with_defined_sms_sender(sms_sender_value='2028675309') template = create_template(service=service) notification = _notification_json(template, to="2028675301") mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( service.id, notification_id, encryption.encrypt(notification), ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == '+12028675309' def test_save_sms_uses_non_default_sms_sender_reply_to_text_if_provided(mocker, notify_db_session): service = create_service_with_defined_sms_sender(sms_sender_value='2028675309') template = create_template(service=service) new_sender = service_sms_sender_dao.dao_add_sms_sender_for_service(service.id, 'new-sender', False) notification = _notification_json(template, to="202-867-5301") mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') notification_id = uuid.uuid4() save_sms( service.id, notification_id, encryption.encrypt(notification), sender_id=new_sender.id, ) persisted_notification = Notification.query.one() assert persisted_notification.reply_to_text == 'new-sender' def test_should_cancel_job_if_service_is_inactive(sample_service, sample_job, mocker): sample_service.active = False mocker.patch('app.celery.tasks.s3.get_job_from_s3') mocker.patch('app.celery.tasks.process_row') process_job(sample_job.id) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == 'cancelled' s3.get_job_from_s3.assert_not_called() tasks.process_row.assert_not_called() def test_get_email_template_instance(mocker, sample_email_template, sample_job): mocker.patch( 'app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=('', {}), ) sample_job.template_id = sample_email_template.id ( recipient_csv, template, _sender_id, ) = get_recipient_csv_and_template_and_sender_id(sample_job) assert isinstance(template, PlainTextEmailTemplate) assert recipient_csv.placeholders == [ 'email address' ] def test_get_sms_template_instance(mocker, sample_template, sample_job): mocker.patch( 'app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=('', {}), ) sample_job.template = sample_template ( recipient_csv, template, _sender_id, ) = get_recipient_csv_and_template_and_sender_id(sample_job) assert isinstance(template, SMSMessageTemplate) assert recipient_csv.placeholders == [ 'phone number' ] def test_send_inbound_sms_to_service_post_https_request_to_service(notify_api, sample_service): inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", bearer_token="something_unique") inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", provider_date=datetime(2017, 6, 20), content="Here is some content") data = { "id": str(inbound_sms.id), "source_number": inbound_sms.user_number, "destination_number": inbound_sms.notify_number, "message": inbound_sms.content, "date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT) } with requests_mock.Mocker() as request_mock: request_mock.post(inbound_api.url, json={}, status_code=200) send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) assert request_mock.call_count == 1 assert request_mock.request_history[0].url == inbound_api.url assert request_mock.request_history[0].method == 'POST' assert request_mock.request_history[0].text == json.dumps(data) assert request_mock.request_history[0].headers["Content-type"] == "application/json" assert request_mock.request_history[0].headers["Authorization"] == "Bearer {}".format(inbound_api.bearer_token) def test_send_inbound_sms_to_service_does_not_send_request_when_inbound_sms_does_not_exist(notify_api, sample_service): inbound_api = create_service_inbound_api(service=sample_service) with requests_mock.Mocker() as request_mock: request_mock.post(inbound_api.url, json={}, status_code=200) with pytest.raises(SQLAlchemyError): send_inbound_sms_to_service(inbound_sms_id=uuid.uuid4(), service_id=sample_service.id) assert request_mock.call_count == 0 def test_send_inbound_sms_to_service_does_not_sent_request_when_inbound_api_does_not_exist( notify_api, sample_service, mocker): inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", provider_date=datetime(2017, 6, 20), content="Here is some content") mocked = mocker.patch("requests.request") send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) assert mocked.call_count == 0 def test_send_inbound_sms_to_service_retries_if_request_returns_500(notify_api, sample_service, mocker): inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", bearer_token="something_unique") inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", provider_date=datetime(2017, 6, 20), content="Here is some content") mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry') with requests_mock.Mocker() as request_mock: request_mock.post(inbound_api.url, json={}, status_code=500) send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) assert mocked.call_count == 1 assert mocked.call_args[1]['queue'] == 'retry-tasks' def test_send_inbound_sms_to_service_retries_if_request_throws_unknown(notify_api, sample_service, mocker): create_service_inbound_api( service=sample_service, url="https://some.service.gov.uk/", bearer_token="something_unique") inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", provider_date=datetime(2017, 6, 20), content="Here is some content") mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry') mocker.patch("app.celery.tasks.request", side_effect=RequestException()) send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) assert mocked.call_count == 1 assert mocked.call_args[1]['queue'] == 'retry-tasks' def test_send_inbound_sms_to_service_does_not_retries_if_request_returns_404(notify_api, sample_service, mocker): inbound_api = create_service_inbound_api(service=sample_service, url="https://some.service.gov.uk/", bearer_token="something_unique") inbound_sms = create_inbound_sms(service=sample_service, notify_number="0751421", user_number="447700900111", provider_date=datetime(2017, 6, 20), content="Here is some content") mocked = mocker.patch('app.celery.tasks.send_inbound_sms_to_service.retry') with requests_mock.Mocker() as request_mock: request_mock.post(inbound_api.url, json={}, status_code=404) send_inbound_sms_to_service(inbound_sms.id, inbound_sms.service_id) assert mocked.call_count == 0 def test_process_incomplete_job_sms(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) assert Notification.query.filter(Notification.job_id == job.id).count() == 2 process_incomplete_job(str(job.id)) completed_job = Job.query.filter(Job.id == job.id).one() assert completed_job.job_status == JOB_STATUS_FINISHED assert save_sms.call_count == 8 # There are 10 in the file and we've added two already def test_process_incomplete_job_with_notifications_all_sent(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) create_notification(sample_template, job, 2) create_notification(sample_template, job, 3) create_notification(sample_template, job, 4) create_notification(sample_template, job, 5) create_notification(sample_template, job, 6) create_notification(sample_template, job, 7) create_notification(sample_template, job, 8) create_notification(sample_template, job, 9) assert Notification.query.filter(Notification.job_id == job.id).count() == 10 process_incomplete_job(str(job.id)) completed_job = Job.query.filter(Job.id == job.id).one() assert completed_job.job_status == JOB_STATUS_FINISHED assert mock_save_sms.call_count == 0 # There are 10 in the file and we've added 10 it should not have been called def test_process_incomplete_jobs_sms(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) create_notification(sample_template, job, 0) create_notification(sample_template, job, 1) create_notification(sample_template, job, 2) assert Notification.query.filter(Notification.job_id == job.id).count() == 3 job2 = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) create_notification(sample_template, job2, 0) create_notification(sample_template, job2, 1) create_notification(sample_template, job2, 2) create_notification(sample_template, job2, 3) create_notification(sample_template, job2, 4) assert Notification.query.filter(Notification.job_id == job2.id).count() == 5 jobs = [job.id, job2.id] process_incomplete_jobs(jobs) completed_job = Job.query.filter(Job.id == job.id).one() completed_job2 = Job.query.filter(Job.id == job2.id).one() assert completed_job.job_status == JOB_STATUS_FINISHED assert completed_job2.job_status == JOB_STATUS_FINISHED assert mock_save_sms.call_count == 12 # There are 20 in total over 2 jobs we've added 8 already def test_process_incomplete_jobs_no_notifications_added(mocker, sample_template): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') job = create_job(template=sample_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) assert Notification.query.filter(Notification.job_id == job.id).count() == 0 process_incomplete_job(job.id) completed_job = Job.query.filter(Job.id == job.id).one() assert completed_job.job_status == JOB_STATUS_FINISHED assert mock_save_sms.call_count == 10 # There are 10 in the csv file def test_process_incomplete_jobs(mocker): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') jobs = [] process_incomplete_jobs(jobs) assert mock_save_sms.call_count == 0 # There are no jobs to process so it will not have been called def test_process_incomplete_job_no_job_in_database(mocker, fake_uuid): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_sms'), {'sender_id': None})) mock_save_sms = mocker.patch('app.celery.tasks.save_sms.apply_async') with pytest.raises(expected_exception=Exception): process_incomplete_job(fake_uuid) assert mock_save_sms.call_count == 0 # There is no job in the db it will not have been called def test_process_incomplete_job_email(mocker, sample_email_template): mocker.patch('app.celery.tasks.s3.get_job_and_metadata_from_s3', return_value=(load_example_csv('multiple_email'), {'sender_id': None})) mock_email_saver = mocker.patch('app.celery.tasks.save_email.apply_async') job = create_job(template=sample_email_template, notification_count=10, created_at=datetime.utcnow() - timedelta(hours=2), scheduled_for=datetime.utcnow() - timedelta(minutes=31), processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR) create_notification(sample_email_template, job, 0) create_notification(sample_email_template, job, 1) assert Notification.query.filter(Notification.job_id == job.id).count() == 2 process_incomplete_job(str(job.id)) completed_job = Job.query.filter(Job.id == job.id).one() assert completed_job.job_status == JOB_STATUS_FINISHED assert mock_email_saver.call_count == 8 # There are 10 in the file and we've added two already @freeze_time('2017-01-01') def test_process_incomplete_jobs_sets_status_to_in_progress_and_resets_processing_started_time(mocker, sample_template): mock_process_incomplete_job = mocker.patch('app.celery.tasks.process_incomplete_job') job1 = create_job( sample_template, processing_started=datetime.utcnow() - timedelta(minutes=30), job_status=JOB_STATUS_ERROR ) job2 = create_job( sample_template, processing_started=datetime.utcnow() - timedelta(minutes=31), job_status=JOB_STATUS_ERROR ) process_incomplete_jobs([str(job1.id), str(job2.id)]) assert job1.job_status == JOB_STATUS_IN_PROGRESS assert job1.processing_started == datetime.utcnow() assert job2.job_status == JOB_STATUS_IN_PROGRESS assert job2.processing_started == datetime.utcnow() assert mock_process_incomplete_job.mock_calls == [call(str(job1.id)), call(str(job2.id))] @freeze_time('2020-03-25 14:30') @pytest.mark.parametrize('notification_type', ['sms', 'email']) def test_save_api_email_or_sms(mocker, sample_service, notification_type): template = create_template(sample_service) if notification_type == SMS_TYPE \ else create_template(sample_service, template_type=EMAIL_TYPE) mock_provider_task = mocker.patch(f'app.celery.provider_tasks.deliver_{notification_type}.apply_async') api_key = create_api_key(service=template.service) data = { "id": str(uuid.uuid4()), "template_id": str(template.id), "template_version": template.version, "service_id": str(template.service_id), "personalisation": None, "notification_type": template.template_type, "api_key_id": str(api_key.id), "key_type": api_key.key_type, "client_reference": 'our email', "reply_to_text": None, "document_download_count": 0, "status": NOTIFICATION_CREATED, "created_at": datetime.utcnow().strftime(DATETIME_FORMAT), } if notification_type == EMAIL_TYPE: data.update({"to": "jane.citizen@example.com"}) expected_queue = QueueNames.SEND_EMAIL else: data.update({"to": "+447700900855"}) expected_queue = QueueNames.SEND_SMS encrypted = encryption.encrypt( data ) assert len(Notification.query.all()) == 0 if notification_type == EMAIL_TYPE: save_api_email(encrypted_notification=encrypted) else: save_api_sms(encrypted_notification=encrypted) notifications = Notification.query.all() assert len(notifications) == 1 assert str(notifications[0].id) == data['id'] assert notifications[0].created_at == datetime(2020, 3, 25, 14, 30) assert notifications[0].notification_type == notification_type mock_provider_task.assert_called_once_with([data['id']], queue=expected_queue) @freeze_time('2020-03-25 14:30') @pytest.mark.parametrize('notification_type', ['sms', 'email']) def test_save_api_email_dont_retry_if_notification_already_exists(sample_service, mocker, notification_type): template = create_template(sample_service) if notification_type == SMS_TYPE \ else create_template(sample_service, template_type=EMAIL_TYPE) mock_provider_task = mocker.patch(f'app.celery.provider_tasks.deliver_{notification_type}.apply_async') api_key = create_api_key(service=template.service) data = { "id": str(uuid.uuid4()), "template_id": str(template.id), "template_version": template.version, "service_id": str(template.service_id), "personalisation": None, "notification_type": template.template_type, "api_key_id": str(api_key.id), "key_type": api_key.key_type, "client_reference": 'our email', "reply_to_text": "our.email@gov.uk", "document_download_count": 0, "status": NOTIFICATION_CREATED, "created_at": datetime.utcnow().strftime(DATETIME_FORMAT), } if notification_type == EMAIL_TYPE: data.update({"to": "jane.citizen@example.com"}) expected_queue = QueueNames.SEND_EMAIL else: data.update({"to": "+447700900855"}) expected_queue = QueueNames.SEND_SMS encrypted = encryption.encrypt( data ) assert len(Notification.query.all()) == 0 if notification_type == EMAIL_TYPE: save_api_email(encrypted_notification=encrypted) else: save_api_sms(encrypted_notification=encrypted) notifications = Notification.query.all() assert len(notifications) == 1 # call the task again with the same notification if notification_type == EMAIL_TYPE: save_api_email(encrypted_notification=encrypted) else: save_api_sms(encrypted_notification=encrypted) notifications = Notification.query.all() assert len(notifications) == 1 assert str(notifications[0].id) == data['id'] assert notifications[0].created_at == datetime(2020, 3, 25, 14, 30) # should only have sent the notification once. mock_provider_task.assert_called_once_with([data['id']], queue=expected_queue) @pytest.mark.parametrize('task_function, delivery_mock, recipient, template_args', ( ( save_email, 'app.celery.provider_tasks.deliver_email.apply_async', 'test@example.com', {'template_type': 'email', 'subject': 'Hello'}, ), ( save_sms, 'app.celery.provider_tasks.deliver_sms.apply_async', '202-867-5309', {'template_type': 'sms'} ), )) def test_save_tasks_use_cached_service_and_template( notify_db_session, mocker, task_function, delivery_mock, recipient, template_args, ): service = create_service() template = create_template(service=service, **template_args) notification = _notification_json(template, to=recipient) delivery_mock = mocker.patch(delivery_mock) service_dict_mock = mocker.patch( 'app.serialised_models.SerialisedService.get_dict', wraps=SerialisedService.get_dict, ) template_dict_mock = mocker.patch( 'app.serialised_models.SerialisedTemplate.get_dict', wraps=SerialisedTemplate.get_dict, ) for _ in range(2): task_function( service.id, uuid.uuid4(), encryption.encrypt(notification), ) # We talk to the database once for the service and once for the # template; subsequent calls are caught by the in memory cache assert service_dict_mock.call_args_list == [ call(service.id), ] assert template_dict_mock.call_args_list == [ call(str(template.id), str(service.id), 1), ] # But we save 2 notifications and enqueue 2 tasks assert len(Notification.query.all()) == 2 assert len(delivery_mock.call_args_list) == 2 @freeze_time('2020-03-25 14:30') @pytest.mark.parametrize('notification_type, task_function, expected_queue, recipient', ( ('sms', save_api_sms, QueueNames.SEND_SMS, '+447700900855'), ('email', save_api_email, QueueNames.SEND_EMAIL, 'jane.citizen@example.com'), )) def test_save_api_tasks_use_cache( sample_service, mocker, notification_type, task_function, expected_queue, recipient, ): mock_provider_task = mocker.patch( f'app.celery.provider_tasks.deliver_{notification_type}.apply_async' ) service_dict_mock = mocker.patch( 'app.serialised_models.SerialisedService.get_dict', wraps=SerialisedService.get_dict, ) template = create_template(sample_service, template_type=notification_type) api_key = create_api_key(service=template.service) def create_encrypted_notification(): return encryption.encrypt({ "to": recipient, "id": str(uuid.uuid4()), "template_id": str(template.id), "template_version": template.version, "service_id": str(template.service_id), "personalisation": None, "notification_type": template.template_type, "api_key_id": str(api_key.id), "key_type": api_key.key_type, "client_reference": 'our email', "reply_to_text": "our.email@gov.uk", "document_download_count": 0, "status": NOTIFICATION_CREATED, "created_at": datetime.utcnow().strftime(DATETIME_FORMAT), }) assert len(Notification.query.all()) == 0 for _ in range(3): task_function(encrypted_notification=create_encrypted_notification()) assert service_dict_mock.call_args_list == [ call(str(template.service_id)) ] assert len(Notification.query.all()) == 3 assert len(mock_provider_task.call_args_list) == 3