diff --git a/app/celery/tasks.py b/app/celery/tasks.py index fbf060621..973c2de78 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,4 +1,5 @@ from datetime import (datetime) + from flask import current_app from notifications_utils.recipients import ( RecipientCSV @@ -17,6 +18,7 @@ from app.dao.jobs_dao import ( dao_update_job, dao_get_job_by_id ) +from app.dao.notifications_dao import get_notification_by_id from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count from app.dao.templates_dao import dao_get_template_by_id from app.models import ( @@ -27,8 +29,6 @@ from app.models import ( from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd -from app import redis_store -from app.clients.redis import daily_limit_cache_key @notify_celery.task(name="process-job") @@ -119,7 +119,6 @@ def send_sms(self, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): - # notification_id is not used, it is created by the db model object notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) @@ -141,6 +140,7 @@ def send_sms(self, created_at=created_at, job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), + notification_id=notification_id ) provider_tasks.deliver_sms.apply_async( @@ -153,17 +153,21 @@ def send_sms(self, ) except SQLAlchemyError as e: - current_app.logger.exception( - "RETRY: send_sms notification for job {} row number {}".format( - notification.get('job', None), - notification.get('row_number', None)), e) - try: - raise self.retry(queue="retry", exc=e) - except self.MaxRetriesExceededError: + if not get_notification_by_id(notification_id): + # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems + # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not + # send to the retry queue. current_app.logger.exception( - "RETRY FAILED: task send_sms failed for notification".format( + "RETRY: send_sms notification for job {} row number {}".format( notification.get('job', None), notification.get('row_number', None)), e) + try: + raise self.retry(queue="retry", exc=e) + except self.MaxRetriesExceededError: + current_app.logger.exception( + "RETRY FAILED: task send_sms failed for notification".format( + notification.get('job', None), + notification.get('row_number', None)), e) @notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300) @@ -174,7 +178,6 @@ def send_email(self, service_id, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): - # notification_id is not used, it is created by the db model object notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) @@ -195,7 +198,7 @@ def send_email(self, service_id, created_at=created_at, job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), - + notification_id=notification_id ) provider_tasks.deliver_email.apply_async( @@ -205,12 +208,17 @@ def send_email(self, service_id, current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at)) except SQLAlchemyError as e: - current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None), - notification.get('row_number', None)), e) - try: - raise self.retry(queue="retry", exc=e) - except self.MaxRetriesExceededError: - current_app.logger.error( - "RETRY FAILED: task send_email failed for notification".format( - notification.get('job', None), - notification.get('row_number', None)), e) + if not get_notification_by_id(notification_id): + # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems + # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not + # send to the retry queue. + current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None), + notification.get('row_number', None)), + e) + try: + raise self.retry(queue="retry", exc=e) + except self.MaxRetriesExceededError: + current_app.logger.error( + "RETRY FAILED: task send_email failed for notification".format( + notification.get('job', None), + notification.get('row_number', None)), e) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 1887eccc8..72b079011 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -4,7 +4,7 @@ from flask import current_app from notifications_utils.renderers import PassThrough from notifications_utils.template import Template -from app import DATETIME_FORMAT, redis_store +from app import redis_store from app.celery import provider_tasks from app.clients import redis from app.dao.notifications_dao import dao_create_notification, dao_delete_notifications_and_history_by_id @@ -48,8 +48,10 @@ def persist_notification(template_id, created_at=None, job_id=None, job_row_number=None, - reference=None): + reference=None, + notification_id=None): notification = Notification( + id=notification_id, template_id=template_id, template_version=template_version, to=recipient, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index c17533296..0b9d255dc 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -806,3 +806,41 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry') assert Notification.query.count() == 0 + + +def test_send_email_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): + json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) + deliver_email = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') + retry = mocker.patch('app.celery.tasks.send_email.retry', side_effect=Exception()) + now = datetime.utcnow() + + notification_id = sample_notification.id + + send_email( + sample_notification.service_id, + notification_id, + encryption.encrypt(json), + now.strftime(DATETIME_FORMAT) + ) + assert Notification.query.count() == 1 + assert not deliver_email.called + assert not retry.called + + +def test_send_sms_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): + json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) + deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') + retry = mocker.patch('app.celery.tasks.send_sms.retry', side_effect=Exception()) + now = datetime.utcnow() + + notification_id = sample_notification.id + + send_sms( + sample_notification.service_id, + notification_id, + encryption.encrypt(json), + now.strftime(DATETIME_FORMAT) + ) + assert Notification.query.count() == 1 + assert not deliver_sms.called + assert not retry.called diff --git a/tests/app/notifications/test_process_notification.py b/tests/app/notifications/test_process_notification.py index 3badaa554..79db94c96 100644 --- a/tests/app/notifications/test_process_notification.py +++ b/tests/app/notifications/test_process_notification.py @@ -1,4 +1,5 @@ import datetime +import uuid import pytest from boto3.exceptions import Boto3Error @@ -108,7 +109,7 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker) assert Notification.query.count() == 0 assert NotificationHistory.query.count() == 0 mocked_redis = mocker.patch('app.notifications.process_notifications.redis_store.incr') - + n_id = uuid.uuid4() created_at = datetime.datetime(2016, 11, 11, 16, 8, 18) persist_notification(template_id=sample_job.template.id, template_version=sample_job.template.version, @@ -120,11 +121,13 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker) created_at=created_at, job_id=sample_job.id, job_row_number=10, - reference="ref from client") + reference="ref from client", + notification_id=n_id) assert Notification.query.count() == 1 assert NotificationHistory.query.count() == 1 persisted_notification = Notification.query.all()[0] - assert persisted_notification.job_id == sample_job.id + assert persisted_notification.id == n_id + persisted_notification.job_id == sample_job.id assert persisted_notification.job_row_number == 10 assert persisted_notification.created_at == created_at mocked_redis.assert_called_once_with(str(sample_job.service_id) + "-2016-01-01-count") diff --git a/tests/app/notifications/test_validators.py b/tests/app/notifications/test_validators.py index 2e1974359..51b1e56db 100644 --- a/tests/app/notifications/test_validators.py +++ b/tests/app/notifications/test_validators.py @@ -1,3 +1,4 @@ +from datetime import datetime import pytest from freezegun import freeze_time @@ -21,10 +22,9 @@ from tests.app.conftest import ( @pytest.mark.parametrize('key_type', ['team', 'normal']) -def test_exception_thown_by_redis_store_get_should_not_be_fatal( +def test_exception_thrown_by_redis_store_get_should_not_be_fatal( notify_db, notify_db_session, - notify_api, key_type, mocker): mocker.patch('app.notifications.validators.redis_store.redis_store.get', side_effect=Exception("broken redis")) @@ -39,7 +39,8 @@ def test_exception_thown_by_redis_store_get_should_not_be_fatal( assert e.value.status_code == 429 assert e.value.message == 'Exceeded send limits (4) for today' assert e.value.fields == [] - app.notifications.validators.redis_store.set.assert_not_called() + app.notifications.validators.redis_store.set\ + .assert_called_with("{}-{}-count".format(service.id, datetime.utcnow().strftime("%Y-%m-%d")), 5, ex=3600) @pytest.mark.parametrize('key_type', ['test', 'team', 'normal'])