Files
notifications-api/app/celery/process_ses_receipts_tasks.py
Alexey Bezhan 6f5822ae5b Downgrade log level for missing notifications in SES receipt
The timestamps available in the SES receipt don't always correspond
to the time the notification has been sent. We've seen callbacks with
a current timestamp in both 'mail' and 'bounce' objects that referenced
a notification sent a week ago, which means we can't rely on it to skip
archived notifications.

One possible approach would be to look up the notification reference in
the notification_history table, but this goes against our plans to stop
relying on it in the future.

This changes the SES receipts logic to retry missing notifications once
(if the callback timestamp is within the last 5 minutes the task will
retry after a 5 minute delay) to capture callbacks arriving before the
notification reference has been persisted to the DB. Otherwise, we log
the missing notification as a warning instead of error.
2019-03-06 11:35:32 +00:00

85 lines
3.2 KiB
Python

from datetime import datetime, timedelta
import iso8601
from celery.exceptions import Retry
from flask import current_app, json
from notifications_utils.statsd_decorators import statsd
from sqlalchemy.orm.exc import NoResultFound
from app import notify_celery, statsd_client
from app.config import QueueNames
from app.clients.email.aws_ses import get_aws_responses
from app.dao import notifications_dao
from app.models import NOTIFICATION_SENDING, NOTIFICATION_PENDING
from app.notifications.notifications_ses_callback import (
determine_notification_bounce_type,
handle_complaint,
_check_and_queue_complaint_callback_task,
_check_and_queue_callback_task,
)
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
@statsd(namespace="tasks")
def process_ses_results(self, response):
try:
ses_message = json.loads(response['Message'])
notification_type = ses_message['notificationType']
if notification_type == 'Bounce':
notification_type = determine_notification_bounce_type(notification_type, ses_message)
elif notification_type == 'Complaint':
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
return True
aws_response_dict = get_aws_responses(notification_type)
notification_status = aws_response_dict['notification_status']
reference = ses_message['mail']['messageId']
try:
notification = notifications_dao.dao_get_notification_by_reference(reference)
except NoResultFound:
message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None)
if datetime.utcnow() - message_time < timedelta(minutes=5):
self.retry(queue=QueueNames.RETRY)
else:
current_app.logger.warning(
"notification not found for reference: {} (update to {})".format(reference, notification_status)
)
return
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
notifications_dao._duplicate_update_warning(notification, notification_status)
return
notifications_dao._update_notification_status(notification=notification, status=notification_status)
if not aws_response_dict['success']:
current_app.logger.info(
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
notification.id, reference, aws_response_dict['message']
)
)
else:
current_app.logger.info('SES callback return status of {} for notification: {}'.format(
notification_status, notification.id
))
statsd_client.incr('callback.ses.{}'.format(notification_status))
if notification.sent_at:
statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at)
_check_and_queue_callback_task(notification)
return True
except Retry:
raise
except Exception as e:
current_app.logger.exception('Error processing SES results: {}'.format(type(e)))
self.retry(queue=QueueNames.RETRY)