From be113e031fed7ea5c767657bff6d7514312915ba Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Fri, 25 Nov 2016 17:32:01 +0000 Subject: [PATCH] Sometimes a message is picked up twice of the SQS queue, we need to safe gaurd ourselves for that. In this PR the id for the notification is passed in and used to created the notification, which causes a integrity error. Normally when we get a SQLAlchemy error here we send the message to the retry queue, but if the notification already exists we just ignore it. --- app/celery/tasks.py | 52 +++++++++++-------- app/notifications/process_notifications.py | 6 ++- tests/app/celery/test_tasks.py | 38 ++++++++++++++ .../test_process_notification.py | 9 ++-- tests/app/notifications/test_validators.py | 7 +-- 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index fbf060621..973c2de78 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,4 +1,5 @@ from datetime import (datetime) + from flask import current_app from notifications_utils.recipients import ( RecipientCSV @@ -17,6 +18,7 @@ from app.dao.jobs_dao import ( dao_update_job, dao_get_job_by_id ) +from app.dao.notifications_dao import get_notification_by_id from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count from app.dao.templates_dao import dao_get_template_by_id from app.models import ( @@ -27,8 +29,6 @@ from app.models import ( from app.notifications.process_notifications import persist_notification from app.service.utils import service_allowed_to_send_to from app.statsd_decorators import statsd -from app import redis_store -from app.clients.redis import daily_limit_cache_key @notify_celery.task(name="process-job") @@ -119,7 +119,6 @@ def send_sms(self, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): - # notification_id is not used, it is created by the db model object notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) @@ -141,6 +140,7 @@ def send_sms(self, created_at=created_at, job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), + notification_id=notification_id ) provider_tasks.deliver_sms.apply_async( @@ -153,17 +153,21 @@ def send_sms(self, ) except SQLAlchemyError as e: - current_app.logger.exception( - "RETRY: send_sms notification for job {} row number {}".format( - notification.get('job', None), - notification.get('row_number', None)), e) - try: - raise self.retry(queue="retry", exc=e) - except self.MaxRetriesExceededError: + if not get_notification_by_id(notification_id): + # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems + # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not + # send to the retry queue. current_app.logger.exception( - "RETRY FAILED: task send_sms failed for notification".format( + "RETRY: send_sms notification for job {} row number {}".format( notification.get('job', None), notification.get('row_number', None)), e) + try: + raise self.retry(queue="retry", exc=e) + except self.MaxRetriesExceededError: + current_app.logger.exception( + "RETRY FAILED: task send_sms failed for notification".format( + notification.get('job', None), + notification.get('row_number', None)), e) @notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300) @@ -174,7 +178,6 @@ def send_email(self, service_id, created_at, api_key_id=None, key_type=KEY_TYPE_NORMAL): - # notification_id is not used, it is created by the db model object notification = encryption.decrypt(encrypted_notification) service = dao_fetch_service_by_id(service_id) @@ -195,7 +198,7 @@ def send_email(self, service_id, created_at=created_at, job_id=notification.get('job', None), job_row_number=notification.get('row_number', None), - + notification_id=notification_id ) provider_tasks.deliver_email.apply_async( @@ -205,12 +208,17 @@ def send_email(self, service_id, current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at)) except SQLAlchemyError as e: - current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None), - notification.get('row_number', None)), e) - try: - raise self.retry(queue="retry", exc=e) - except self.MaxRetriesExceededError: - current_app.logger.error( - "RETRY FAILED: task send_email failed for notification".format( - notification.get('job', None), - notification.get('row_number', None)), e) + if not get_notification_by_id(notification_id): + # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems + # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not + # send to the retry queue. + current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None), + notification.get('row_number', None)), + e) + try: + raise self.retry(queue="retry", exc=e) + except self.MaxRetriesExceededError: + current_app.logger.error( + "RETRY FAILED: task send_email failed for notification".format( + notification.get('job', None), + notification.get('row_number', None)), e) diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 1887eccc8..72b079011 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -4,7 +4,7 @@ from flask import current_app from notifications_utils.renderers import PassThrough from notifications_utils.template import Template -from app import DATETIME_FORMAT, redis_store +from app import redis_store from app.celery import provider_tasks from app.clients import redis from app.dao.notifications_dao import dao_create_notification, dao_delete_notifications_and_history_by_id @@ -48,8 +48,10 @@ def persist_notification(template_id, created_at=None, job_id=None, job_row_number=None, - reference=None): + reference=None, + notification_id=None): notification = Notification( + id=notification_id, template_id=template_id, template_version=template_version, to=recipient, diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index c17533296..0b9d255dc 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -806,3 +806,41 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry') assert Notification.query.count() == 0 + + +def test_send_email_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): + json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) + deliver_email = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async') + retry = mocker.patch('app.celery.tasks.send_email.retry', side_effect=Exception()) + now = datetime.utcnow() + + notification_id = sample_notification.id + + send_email( + sample_notification.service_id, + notification_id, + encryption.encrypt(json), + now.strftime(DATETIME_FORMAT) + ) + assert Notification.query.count() == 1 + assert not deliver_email.called + assert not retry.called + + +def test_send_sms_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker): + json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1) + deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async') + retry = mocker.patch('app.celery.tasks.send_sms.retry', side_effect=Exception()) + now = datetime.utcnow() + + notification_id = sample_notification.id + + send_sms( + sample_notification.service_id, + notification_id, + encryption.encrypt(json), + now.strftime(DATETIME_FORMAT) + ) + assert Notification.query.count() == 1 + assert not deliver_sms.called + assert not retry.called diff --git a/tests/app/notifications/test_process_notification.py b/tests/app/notifications/test_process_notification.py index 3badaa554..79db94c96 100644 --- a/tests/app/notifications/test_process_notification.py +++ b/tests/app/notifications/test_process_notification.py @@ -1,4 +1,5 @@ import datetime +import uuid import pytest from boto3.exceptions import Boto3Error @@ -108,7 +109,7 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker) assert Notification.query.count() == 0 assert NotificationHistory.query.count() == 0 mocked_redis = mocker.patch('app.notifications.process_notifications.redis_store.incr') - + n_id = uuid.uuid4() created_at = datetime.datetime(2016, 11, 11, 16, 8, 18) persist_notification(template_id=sample_job.template.id, template_version=sample_job.template.version, @@ -120,11 +121,13 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker) created_at=created_at, job_id=sample_job.id, job_row_number=10, - reference="ref from client") + reference="ref from client", + notification_id=n_id) assert Notification.query.count() == 1 assert NotificationHistory.query.count() == 1 persisted_notification = Notification.query.all()[0] - assert persisted_notification.job_id == sample_job.id + assert persisted_notification.id == n_id + persisted_notification.job_id == sample_job.id assert persisted_notification.job_row_number == 10 assert persisted_notification.created_at == created_at mocked_redis.assert_called_once_with(str(sample_job.service_id) + "-2016-01-01-count") diff --git a/tests/app/notifications/test_validators.py b/tests/app/notifications/test_validators.py index 2e1974359..51b1e56db 100644 --- a/tests/app/notifications/test_validators.py +++ b/tests/app/notifications/test_validators.py @@ -1,3 +1,4 @@ +from datetime import datetime import pytest from freezegun import freeze_time @@ -21,10 +22,9 @@ from tests.app.conftest import ( @pytest.mark.parametrize('key_type', ['team', 'normal']) -def test_exception_thown_by_redis_store_get_should_not_be_fatal( +def test_exception_thrown_by_redis_store_get_should_not_be_fatal( notify_db, notify_db_session, - notify_api, key_type, mocker): mocker.patch('app.notifications.validators.redis_store.redis_store.get', side_effect=Exception("broken redis")) @@ -39,7 +39,8 @@ def test_exception_thown_by_redis_store_get_should_not_be_fatal( assert e.value.status_code == 429 assert e.value.message == 'Exceeded send limits (4) for today' assert e.value.fields == [] - app.notifications.validators.redis_store.set.assert_not_called() + app.notifications.validators.redis_store.set\ + .assert_called_with("{}-{}-count".format(service.id, datetime.utcnow().strftime("%Y-%m-%d")), 5, ex=3600) @pytest.mark.parametrize('key_type', ['test', 'team', 'normal'])