Files
notifications-api/app/celery/process_ses_receipts_tasks.py

148 lines
5.7 KiB
Python
Raw Normal View History

2022-09-15 14:59:13 -07:00
import enum
2022-09-30 10:59:48 -04:00
import traceback
from datetime import datetime, timedelta
2022-09-15 14:59:13 -07:00
from json import decoder
import iso8601
2022-09-15 14:59:13 -07:00
import requests
from celery.exceptions import Retry
2022-09-15 14:59:13 -07:00
from flask import Blueprint, current_app, json, jsonify, request
from sqlalchemy.orm.exc import NoResultFound
2022-10-03 09:05:34 -07:00
from app import notify_celery, statsd_client
from app.celery.validate_sns_message import sns_notification_handler
2021-03-10 13:55:06 +00:00
from app.config import QueueNames
from app.dao import notifications_dao
2022-09-15 14:59:13 -07:00
from app.errors import InvalidRequest, register_errors
2021-03-10 13:55:06 +00:00
from app.models import NOTIFICATION_PENDING, NOTIFICATION_SENDING
from app.notifications.notifications_ses_callback import (
2021-03-10 13:55:06 +00:00
_check_and_queue_complaint_callback_task,
check_and_queue_callback_task,
determine_notification_bounce_type,
2022-09-15 14:59:13 -07:00
get_aws_responses,
handle_complaint,
)
2022-09-15 14:59:13 -07:00
ses_callback_blueprint = Blueprint('notifications_ses_callback', __name__)
DEFAULT_MAX_AGE = timedelta(days=10000)
2022-09-15 14:59:13 -07:00
register_errors(ses_callback_blueprint)
class SNSMessageType(enum.Enum):
SubscriptionConfirmation = 'SubscriptionConfirmation'
Notification = 'Notification'
UnsubscribeConfirmation = 'UnsubscribeConfirmation'
class InvalidMessageTypeException(Exception):
pass
def verify_message_type(message_type: str):
try:
SNSMessageType(message_type)
except ValueError:
raise InvalidMessageTypeException(f'{message_type} is not a valid message type.')
# 400 counts as a permanent failure so SNS will not retry.
# 500 counts as a failed delivery attempt so SNS will retry.
# See https://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html#DeliveryPolicies
# This should not be here, it used to be in notifications/notifications_ses_callback. It then
# got refactored into a task, which is fine, but it created a circular dependency. Will need
# to investigate why GDS extracted this into a lambda
@ses_callback_blueprint.route('/notifications/email/ses', methods=['POST'])
2022-10-03 09:05:34 -07:00
def email_ses_callback_handler():
2022-09-15 14:59:13 -07:00
try:
2022-10-03 09:05:34 -07:00
data = sns_notification_handler(request.data, request.headers)
except Exception as e:
2022-09-15 14:59:13 -07:00
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
2022-10-03 09:05:34 -07:00
message = data.get("Message")
if "mail" in message:
process_ses_results.apply_async([{"Message": message}], queue=QueueNames.NOTIFY)
2022-09-15 14:59:13 -07:00
return jsonify(
result="success", message="SES-SNS callback succeeded"
), 200
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
def process_ses_results(self, response):
try:
2022-09-15 14:59:13 -07:00
ses_message = json.loads(response["Message"])
notification_type = ses_message["notificationType"]
# TODO remove after smoke testing on prod is implemented
current_app.logger.info(f"Attempting to process SES delivery status message from SNS with type: {notification_type} and body: {ses_message}")
2022-09-15 15:48:37 -07:00
bounce_message = None
if notification_type == 'Bounce':
bounce_message = determine_notification_bounce_type(ses_message)
2022-09-15 15:48:37 -07:00
elif notification_type == 'Complaint':
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
return True
2022-09-15 14:59:13 -07:00
aws_response_dict = get_aws_responses(ses_message)
2022-09-15 14:59:13 -07:00
notification_status = aws_response_dict["notification_status"]
reference = ses_message["mail"]["messageId"]
try:
2022-09-15 14:59:13 -07:00
notification = notifications_dao.dao_get_notification_by_reference(reference)
except NoResultFound:
2022-09-15 14:59:13 -07:00
message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None)
if datetime.utcnow() - message_time < timedelta(minutes=5):
2022-09-15 15:48:37 -07:00
current_app.logger.info(
f"notification not found for reference: {reference} (while attempting update to {notification_status}). "
f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue"
)
self.retry(queue=QueueNames.RETRY)
else:
current_app.logger.warning(
2022-09-15 15:48:37 -07:00
"notification not found for reference: {} (while attempting update to {})".format(reference, notification_status)
)
return
2022-09-15 15:48:37 -07:00
if bounce_message:
current_app.logger.info(f"SES bounce for notification ID {notification.id}: {bounce_message}")
2022-09-15 14:59:13 -07:00
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
notifications_dao._duplicate_update_warning(
2022-09-15 14:59:13 -07:00
notification,
notification_status
)
return
2022-09-15 14:59:13 -07:00
notifications_dao._update_notification_status(
notification=notification,
status=notification_status,
provider_response=aws_response_dict["provider_response"],
)
if not aws_response_dict["success"]:
current_app.logger.info(
"SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
notification.id, reference, aws_response_dict["message"]
)
)
else:
2022-09-15 14:59:13 -07:00
current_app.logger.info(
"SES callback return status of {} for notification: {}".format(notification_status, notification.id)
)
2022-09-15 14:59:13 -07:00
statsd_client.incr("callback.ses.{}".format(notification_status))
if notification.sent_at:
2022-09-15 14:59:13 -07:00
statsd_client.timing_with_dates("callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at)
check_and_queue_callback_task(notification)
return True
except Retry:
raise
except Exception as e:
2022-09-15 14:59:13 -07:00
current_app.logger.exception("Error processing SES results: {}".format(type(e)))
self.retry(queue=QueueNames.RETRY)
2022-09-15 14:59:13 -07:00