mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-29 03:42:07 -05:00
Celery `self.retry` raises an exception to communicate that the task needs to be retried. Since our ses task is wrapped in a catch-all except block it logs that exception as an error before retrying. Handling Retry class separately allows us to raise it without logging the traceback.
85 lines
3.3 KiB
Python
85 lines
3.3 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
import iso8601
|
|
from celery.exceptions import Retry
|
|
from flask import current_app, json
|
|
from notifications_utils.statsd_decorators import statsd
|
|
from sqlalchemy.orm.exc import NoResultFound
|
|
|
|
from app import notify_celery, statsd_client
|
|
from app.config import QueueNames
|
|
from app.clients.email.aws_ses import get_aws_responses
|
|
from app.dao import notifications_dao
|
|
from app.models import NOTIFICATION_SENDING, NOTIFICATION_PENDING
|
|
|
|
from app.notifications.notifications_ses_callback import (
|
|
determine_notification_bounce_type,
|
|
handle_complaint,
|
|
_check_and_queue_complaint_callback_task,
|
|
_check_and_queue_callback_task,
|
|
)
|
|
|
|
|
|
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
|
|
@statsd(namespace="tasks")
|
|
def process_ses_results(self, response):
|
|
try:
|
|
ses_message = json.loads(response['Message'])
|
|
notification_type = ses_message['notificationType']
|
|
|
|
if notification_type == 'Bounce':
|
|
notification_type = determine_notification_bounce_type(notification_type, ses_message)
|
|
elif notification_type == 'Complaint':
|
|
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
|
|
return True
|
|
|
|
aws_response_dict = get_aws_responses(notification_type)
|
|
|
|
notification_status = aws_response_dict['notification_status']
|
|
reference = ses_message['mail']['messageId']
|
|
|
|
try:
|
|
notification = notifications_dao.dao_get_notification_by_reference(reference)
|
|
except NoResultFound:
|
|
message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None)
|
|
if datetime.utcnow() - message_time < timedelta(minutes=10):
|
|
self.retry(queue=QueueNames.RETRY)
|
|
elif datetime.utcnow() - message_time < timedelta(days=3):
|
|
current_app.logger.error(
|
|
"notification not found for reference: {} (update to {})".format(reference, notification_status)
|
|
)
|
|
return
|
|
|
|
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
|
|
notifications_dao._duplicate_update_warning(notification, notification_status)
|
|
return
|
|
|
|
notifications_dao._update_notification_status(notification=notification, status=notification_status)
|
|
|
|
if not aws_response_dict['success']:
|
|
current_app.logger.info(
|
|
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
|
|
notification.id, reference, aws_response_dict['message']
|
|
)
|
|
)
|
|
else:
|
|
current_app.logger.info('SES callback return status of {} for notification: {}'.format(
|
|
notification_status, notification.id
|
|
))
|
|
|
|
statsd_client.incr('callback.ses.{}'.format(notification_status))
|
|
|
|
if notification.sent_at:
|
|
statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at)
|
|
|
|
_check_and_queue_callback_task(notification)
|
|
|
|
return True
|
|
|
|
except Retry:
|
|
raise
|
|
|
|
except Exception as e:
|
|
current_app.logger.exception('Error processing SES results: {}'.format(type(e)))
|
|
self.retry(queue=QueueNames.RETRY)
|