mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 08:35:15 -05:00
Merge pull request #747 from alphagov/dont-send-message-twice
Protect send_sms send_email task from being executed twice
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
from datetime import (datetime)
|
||||
|
||||
from flask import current_app
|
||||
from notifications_utils.recipients import (
|
||||
RecipientCSV
|
||||
@@ -17,6 +18,7 @@ from app.dao.jobs_dao import (
|
||||
dao_update_job,
|
||||
dao_get_job_by_id
|
||||
)
|
||||
from app.dao.notifications_dao import get_notification_by_id
|
||||
from app.dao.services_dao import dao_fetch_service_by_id, fetch_todays_total_message_count
|
||||
from app.dao.templates_dao import dao_get_template_by_id
|
||||
from app.models import (
|
||||
@@ -27,8 +29,6 @@ from app.models import (
|
||||
from app.notifications.process_notifications import persist_notification
|
||||
from app.service.utils import service_allowed_to_send_to
|
||||
from app.statsd_decorators import statsd
|
||||
from app import redis_store
|
||||
from app.clients.redis import daily_limit_cache_key
|
||||
|
||||
|
||||
@notify_celery.task(name="process-job")
|
||||
@@ -119,7 +119,6 @@ def send_sms(self,
|
||||
created_at,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL):
|
||||
# notification_id is not used, it is created by the db model object
|
||||
notification = encryption.decrypt(encrypted_notification)
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
|
||||
@@ -141,6 +140,7 @@ def send_sms(self,
|
||||
created_at=created_at,
|
||||
job_id=notification.get('job', None),
|
||||
job_row_number=notification.get('row_number', None),
|
||||
notification_id=notification_id
|
||||
)
|
||||
|
||||
provider_tasks.deliver_sms.apply_async(
|
||||
@@ -153,17 +153,25 @@ def send_sms(self,
|
||||
)
|
||||
|
||||
except SQLAlchemyError as e:
|
||||
current_app.logger.exception(
|
||||
"RETRY: send_sms notification for job {} row number {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None)), e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.exception(
|
||||
"RETRY FAILED: task send_sms failed for notification".format(
|
||||
if not get_notification_by_id(notification_id):
|
||||
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
||||
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
||||
# send to the retry queue.
|
||||
current_app.logger.error(
|
||||
"RETRY: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None)), e)
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.error(
|
||||
"RETRY FAILED: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="send-email", max_retries=5, default_retry_delay=300)
|
||||
@@ -174,7 +182,6 @@ def send_email(self, service_id,
|
||||
created_at,
|
||||
api_key_id=None,
|
||||
key_type=KEY_TYPE_NORMAL):
|
||||
# notification_id is not used, it is created by the db model object
|
||||
notification = encryption.decrypt(encrypted_notification)
|
||||
service = dao_fetch_service_by_id(service_id)
|
||||
|
||||
@@ -195,7 +202,7 @@ def send_email(self, service_id,
|
||||
created_at=created_at,
|
||||
job_id=notification.get('job', None),
|
||||
job_row_number=notification.get('row_number', None),
|
||||
|
||||
notification_id=notification_id
|
||||
)
|
||||
|
||||
provider_tasks.deliver_email.apply_async(
|
||||
@@ -205,12 +212,22 @@ def send_email(self, service_id,
|
||||
|
||||
current_app.logger.info("Email {} created at {}".format(saved_notification.id, created_at))
|
||||
except SQLAlchemyError as e:
|
||||
current_app.logger.exception("RETRY: send_email notification".format(notification.get('job', None),
|
||||
notification.get('row_number', None)), e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
if not get_notification_by_id(notification_id):
|
||||
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
|
||||
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
|
||||
# send to the retry queue.
|
||||
current_app.logger.error(
|
||||
"RETRY FAILED: task send_email failed for notification".format(
|
||||
"RETRY: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None)), e)
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
try:
|
||||
raise self.retry(queue="retry", exc=e)
|
||||
except self.MaxRetriesExceededError:
|
||||
current_app.logger.error(
|
||||
"RETRY FAILED: send_sms notification for job {} row number {} and notification id {}".format(
|
||||
notification.get('job', None),
|
||||
notification.get('row_number', None),
|
||||
notification_id))
|
||||
current_app.logger.exception(e)
|
||||
|
||||
@@ -4,7 +4,7 @@ from flask import current_app
|
||||
from notifications_utils.renderers import PassThrough
|
||||
from notifications_utils.template import Template
|
||||
|
||||
from app import DATETIME_FORMAT, redis_store
|
||||
from app import redis_store
|
||||
from app.celery import provider_tasks
|
||||
from app.clients import redis
|
||||
from app.dao.notifications_dao import dao_create_notification, dao_delete_notifications_and_history_by_id
|
||||
@@ -48,8 +48,10 @@ def persist_notification(template_id,
|
||||
created_at=None,
|
||||
job_id=None,
|
||||
job_row_number=None,
|
||||
reference=None):
|
||||
reference=None,
|
||||
notification_id=None):
|
||||
notification = Notification(
|
||||
id=notification_id,
|
||||
template_id=template_id,
|
||||
template_version=template_version,
|
||||
to=recipient,
|
||||
|
||||
@@ -806,3 +806,41 @@ def test_send_email_should_go_to_retry_queue_if_database_errors(sample_email_tem
|
||||
tasks.send_email.retry.assert_called_with(exc=expected_exception, queue='retry')
|
||||
|
||||
assert Notification.query.count() == 0
|
||||
|
||||
|
||||
def test_send_email_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker):
|
||||
json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1)
|
||||
deliver_email = mocker.patch('app.celery.provider_tasks.deliver_email.apply_async')
|
||||
retry = mocker.patch('app.celery.tasks.send_email.retry', side_effect=Exception())
|
||||
now = datetime.utcnow()
|
||||
|
||||
notification_id = sample_notification.id
|
||||
|
||||
send_email(
|
||||
sample_notification.service_id,
|
||||
notification_id,
|
||||
encryption.encrypt(json),
|
||||
now.strftime(DATETIME_FORMAT)
|
||||
)
|
||||
assert Notification.query.count() == 1
|
||||
assert not deliver_email.called
|
||||
assert not retry.called
|
||||
|
||||
|
||||
def test_send_sms_does_not_send_duplicate_and_does_not_put_in_retry_queue(sample_notification, mocker):
|
||||
json = _notification_json(sample_notification.template, sample_notification.to, job_id=uuid.uuid4(), row_number=1)
|
||||
deliver_sms = mocker.patch('app.celery.provider_tasks.deliver_sms.apply_async')
|
||||
retry = mocker.patch('app.celery.tasks.send_sms.retry', side_effect=Exception())
|
||||
now = datetime.utcnow()
|
||||
|
||||
notification_id = sample_notification.id
|
||||
|
||||
send_sms(
|
||||
sample_notification.service_id,
|
||||
notification_id,
|
||||
encryption.encrypt(json),
|
||||
now.strftime(DATETIME_FORMAT)
|
||||
)
|
||||
assert Notification.query.count() == 1
|
||||
assert not deliver_sms.called
|
||||
assert not retry.called
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from boto3.exceptions import Boto3Error
|
||||
@@ -108,7 +109,7 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker)
|
||||
assert Notification.query.count() == 0
|
||||
assert NotificationHistory.query.count() == 0
|
||||
mocked_redis = mocker.patch('app.notifications.process_notifications.redis_store.incr')
|
||||
|
||||
n_id = uuid.uuid4()
|
||||
created_at = datetime.datetime(2016, 11, 11, 16, 8, 18)
|
||||
persist_notification(template_id=sample_job.template.id,
|
||||
template_version=sample_job.template.version,
|
||||
@@ -120,11 +121,13 @@ def test_persist_notification_with_optionals(sample_job, sample_api_key, mocker)
|
||||
created_at=created_at,
|
||||
job_id=sample_job.id,
|
||||
job_row_number=10,
|
||||
reference="ref from client")
|
||||
reference="ref from client",
|
||||
notification_id=n_id)
|
||||
assert Notification.query.count() == 1
|
||||
assert NotificationHistory.query.count() == 1
|
||||
persisted_notification = Notification.query.all()[0]
|
||||
assert persisted_notification.job_id == sample_job.id
|
||||
assert persisted_notification.id == n_id
|
||||
persisted_notification.job_id == sample_job.id
|
||||
assert persisted_notification.job_row_number == 10
|
||||
assert persisted_notification.created_at == created_at
|
||||
mocked_redis.assert_called_once_with(str(sample_job.service_id) + "-2016-01-01-count")
|
||||
|
||||
Reference in New Issue
Block a user