mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-30 14:31:57 -05:00
Add retries for SES callbacks for recent notifications
We've seen errors caused by what we suspect is a race condition when SES callback processing tries to look up the notification before the sender worker has saved notification reference from the SES POST response to the database. This adds a retry for SES callback task if the notification was not found and the message is less than 10 minutes old and removes the error log message for notifications older than 3 days (since they might no longer exist in the notifications table and would've been marked as failure by then either way). In order to be able to call retry and silence the error log based on notification time this change inlines `process_ses_response` and `update_notification_by_reference` functions into the celery task. It also removes a lot of defensive error-handling that doesn't appear to have been triggered in the last few months (for things like missing keys in SES callback data).
This commit is contained in:
@@ -1,19 +1,80 @@
|
||||
from flask import current_app
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import iso8601
|
||||
from flask import current_app, json
|
||||
from notifications_utils.statsd_decorators import statsd
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from app import notify_celery
|
||||
from app import notify_celery, statsd_client
|
||||
from app.config import QueueNames
|
||||
from app.clients.email.aws_ses import get_aws_responses
|
||||
from app.dao import notifications_dao
|
||||
from app.models import NOTIFICATION_SENDING, NOTIFICATION_PENDING
|
||||
|
||||
from app.notifications.notifications_ses_callback import process_ses_response
|
||||
from app.notifications.notifications_ses_callback import (
|
||||
determine_notification_bounce_type,
|
||||
handle_complaint,
|
||||
_check_and_queue_complaint_callback_task,
|
||||
_check_and_queue_callback_task,
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
|
||||
@statsd(namespace="tasks")
|
||||
def process_ses_results(self, response):
|
||||
try:
|
||||
errors = process_ses_response(response)
|
||||
if errors:
|
||||
current_app.logger.error(errors)
|
||||
except Exception:
|
||||
current_app.logger.exception('Error processing SES results')
|
||||
ses_message = json.loads(response['Message'])
|
||||
notification_type = ses_message['notificationType']
|
||||
|
||||
if notification_type == 'Bounce':
|
||||
notification_type = determine_notification_bounce_type(notification_type, ses_message)
|
||||
elif notification_type == 'Complaint':
|
||||
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
|
||||
return True
|
||||
|
||||
aws_response_dict = get_aws_responses(notification_type)
|
||||
|
||||
notification_status = aws_response_dict['notification_status']
|
||||
reference = ses_message['mail']['messageId']
|
||||
|
||||
try:
|
||||
notification = notifications_dao.dao_get_notification_by_reference(reference)
|
||||
except NoResultFound:
|
||||
message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None)
|
||||
if datetime.utcnow() - message_time < timedelta(minutes=10):
|
||||
self.retry(queue=QueueNames.RETRY)
|
||||
elif datetime.utcnow() - message_time < timedelta(days=3):
|
||||
current_app.logger.error(
|
||||
"notification not found for reference: {} (update to {})".format(reference, notification_status)
|
||||
)
|
||||
return
|
||||
|
||||
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
|
||||
notifications_dao._duplicate_update_warning(notification, notification_status)
|
||||
return
|
||||
|
||||
notifications_dao._update_notification_status(notification=notification, status=notification_status)
|
||||
|
||||
if not aws_response_dict['success']:
|
||||
current_app.logger.info(
|
||||
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
|
||||
notification.id, reference, aws_response_dict['message']
|
||||
)
|
||||
)
|
||||
else:
|
||||
current_app.logger.info('SES callback return status of {} for notification: {}'.format(
|
||||
notification_status, notification.id
|
||||
))
|
||||
|
||||
statsd_client.incr('callback.ses.{}'.format(notification_status))
|
||||
|
||||
if notification.sent_at:
|
||||
statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at)
|
||||
|
||||
_check_and_queue_callback_task(notification)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
current_app.logger.exception('Error processing SES results: {}'.format(type(e)))
|
||||
self.retry(queue=QueueNames.RETRY)
|
||||
|
||||
@@ -1,22 +1,11 @@
|
||||
from datetime import datetime
|
||||
from flask import current_app
|
||||
|
||||
from flask import (
|
||||
current_app,
|
||||
json
|
||||
)
|
||||
|
||||
from app import statsd_client
|
||||
from app.clients.email.aws_ses import get_aws_responses
|
||||
from app.dao import (
|
||||
notifications_dao
|
||||
)
|
||||
from app.dao.complaint_dao import save_complaint
|
||||
from app.dao.notifications_dao import dao_get_notification_history_by_reference
|
||||
from app.dao.service_callback_api_dao import (
|
||||
get_service_delivery_status_callback_api_for_service, get_service_complaint_callback_api_for_service
|
||||
)
|
||||
from app.models import Complaint
|
||||
from app.notifications.process_client_response import validate_callback_data
|
||||
from app.celery.service_callback_tasks import (
|
||||
send_delivery_status_to_service,
|
||||
send_complaint_to_service,
|
||||
@@ -26,76 +15,6 @@ from app.celery.service_callback_tasks import (
|
||||
from app.config import QueueNames
|
||||
|
||||
|
||||
def process_ses_response(ses_request):
|
||||
client_name = 'SES'
|
||||
try:
|
||||
errors = validate_callback_data(data=ses_request, fields=['Message'], client_name=client_name)
|
||||
if errors:
|
||||
return errors
|
||||
|
||||
ses_message = json.loads(ses_request['Message'])
|
||||
errors = validate_callback_data(data=ses_message, fields=['notificationType'], client_name=client_name)
|
||||
if errors:
|
||||
return errors
|
||||
|
||||
notification_type = ses_message['notificationType']
|
||||
if notification_type == 'Bounce':
|
||||
notification_type = determine_notification_bounce_type(notification_type, ses_message)
|
||||
elif notification_type == 'Complaint':
|
||||
complaint, notification, recipient = handle_complaint(ses_message)
|
||||
_check_and_queue_complaint_callback_task(complaint, notification, recipient)
|
||||
return
|
||||
|
||||
try:
|
||||
aws_response_dict = get_aws_responses(notification_type)
|
||||
except KeyError:
|
||||
error = "{} callback failed: status {} not found".format(client_name, notification_type)
|
||||
return error
|
||||
|
||||
notification_status = aws_response_dict['notification_status']
|
||||
|
||||
try:
|
||||
reference = ses_message['mail']['messageId']
|
||||
notification = notifications_dao.update_notification_status_by_reference(
|
||||
reference,
|
||||
notification_status
|
||||
)
|
||||
if not notification:
|
||||
return
|
||||
|
||||
if not aws_response_dict['success']:
|
||||
current_app.logger.info(
|
||||
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
|
||||
notification.id,
|
||||
reference,
|
||||
aws_response_dict['message']
|
||||
)
|
||||
)
|
||||
else:
|
||||
current_app.logger.info('{} callback return status of {} for notification: {}'.format(
|
||||
client_name,
|
||||
notification_status,
|
||||
notification.id))
|
||||
statsd_client.incr('callback.ses.{}'.format(notification_status))
|
||||
if notification.sent_at:
|
||||
statsd_client.timing_with_dates(
|
||||
'callback.ses.elapsed-time'.format(client_name.lower()),
|
||||
datetime.utcnow(),
|
||||
notification.sent_at
|
||||
)
|
||||
|
||||
_check_and_queue_callback_task(notification)
|
||||
return
|
||||
|
||||
except KeyError:
|
||||
error = "SES callback failed: messageId missing"
|
||||
return error
|
||||
|
||||
except ValueError:
|
||||
error = "{} callback failed: invalid json".format(client_name)
|
||||
return error
|
||||
|
||||
|
||||
def determine_notification_bounce_type(notification_type, ses_message):
|
||||
remove_emails_from_bounce(ses_message)
|
||||
current_app.logger.info('SES bounce dict: {}'.format(ses_message))
|
||||
|
||||
Reference in New Issue
Block a user