modify inbound notif processing

This commit is contained in:
jimmoffet
2022-10-03 09:05:34 -07:00
parent 06c2727e65
commit 8cb6f60f04
22 changed files with 665 additions and 44 deletions

View File

@@ -9,8 +9,8 @@ from celery.exceptions import Retry
from flask import Blueprint, current_app, json, jsonify, request
from sqlalchemy.orm.exc import NoResultFound
from app import notify_celery, statsd_client, redis_store
from app.celery.validate_sns import validate_sns_message
from app import notify_celery, statsd_client
from app.celery.validate_sns_message import sns_notification_handler
from app.config import QueueNames
from app.dao import notifications_dao
from app.errors import InvalidRequest, register_errors
@@ -51,43 +51,15 @@ def verify_message_type(message_type: str):
# got refactored into a task, which is fine, but it created a circular dependency. Will need
# to investigate why GDS extracted this into a lambda
@ses_callback_blueprint.route('/notifications/email/ses', methods=['POST'])
def sns_callback_handler():
message_type = request.headers.get('x-amz-sns-message-type')
def email_ses_callback_handler():
try:
verify_message_type(message_type)
except InvalidMessageTypeException:
current_app.logger.exception(f"Response headers: {request.headers}\nResponse data: {request.data}")
data = sns_notification_handler(request.data, request.headers)
except Exception as e:
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
try:
message = json.loads(request.data.decode('utf-8'))
except decoder.JSONDecodeError:
current_app.logger.exception(f"Response headers: {request.headers}\nResponse data: {request.data}")
raise InvalidRequest("SES-SNS callback failed: invalid JSON given", 400)
try:
validate_sns_message(message)
except Exception as err:
current_app.logger.error(f"SES-SNS callback failed: validation failed! Response headers: {request.headers}\nResponse data: {request.data}\nError: Signature validation failed with error {err}")
raise InvalidRequest("SES-SNS callback failed: validation failed", 400)
if message.get('Type') == 'SubscriptionConfirmation':
url = message.get('SubscribeUrl') if 'SubscribeUrl' in message else message.get('SubscribeURL')
response = requests.get(url)
try:
response.raise_for_status()
except Exception as e:
current_app.logger.warning(f"Attempt to raise_for_status()SubscriptionConfirmation Type message files for response: {response.text} with error {e}")
raise e
return jsonify(
result="success", message="SES-SNS auto-confirm callback succeeded"
), 200
# TODO remove after smoke testing on prod is implemented
current_app.logger.info(f"SNS message: {message} is a valid delivery status message. Attempting to process it now.")
process_ses_results.apply_async([{"Message": message.get("Message")}], queue=QueueNames.NOTIFY)
message = data.get("Message")
if "mail" in message:
process_ses_results.apply_async([{"Message": message}], queue=QueueNames.NOTIFY)
return jsonify(
result="success", message="SES-SNS callback succeeded"

View File

@@ -69,10 +69,11 @@ def get_string_to_sign(sns_payload):
return string_to_sign
def validate_sns_message(sns_payload):
def validate_sns_cert(sns_payload):
"""
Adapted from the solution posted at
https://github.com/boto/boto3/issues/2508#issuecomment-992931814
Modified to swap m2crypto for oscrypto
"""
if not isinstance(sns_payload, dict):
raise ValidationError("Unexpected message type {!r}".format(type(sns_payload).__name__))

View File

@@ -0,0 +1,66 @@
import enum
from datetime import timedelta
from json import decoder
import requests
from flask import current_app, json
from app.celery.validate_sns_cert import validate_sns_cert
from app.errors import InvalidRequest
DEFAULT_MAX_AGE = timedelta(days=10000)
class SNSMessageType(enum.Enum):
SubscriptionConfirmation = 'SubscriptionConfirmation'
Notification = 'Notification'
UnsubscribeConfirmation = 'UnsubscribeConfirmation'
class InvalidMessageTypeException(Exception):
pass
def verify_message_type(message_type: str):
try:
SNSMessageType(message_type)
except ValueError:
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
def sns_notification_handler(data, headers):
message_type = headers.get('x-amz-sns-message-type')
try:
verify_message_type(message_type)
except InvalidMessageTypeException:
current_app.logger.exception(f"Response headers: {headers}\nResponse data: {data}")
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
try:
message = json.loads(data.decode('utf-8'))
except decoder.JSONDecodeError:
current_app.logger.exception(f"Response headers: {headers}\nResponse data: {data}")
raise InvalidRequest("SES-SNS callback failed: invalid JSON given", 400)
try:
validate_sns_cert(message)
except Exception as e:
current_app.logger.error(f"SES-SNS callback failed: validation failed with error: Signature validation failed with error {e}")
raise InvalidRequest("SES-SNS callback failed: validation failed", 400)
if message.get('Type') == 'SubscriptionConfirmation':
url = message.get('SubscribeUrl') if 'SubscribeUrl' in message else message.get('SubscribeURL')
response = requests.get(url)
try:
response.raise_for_status()
except Exception as e:
current_app.logger.warning(f"Attempt to raise_for_status()SubscriptionConfirmation Type message files for response: {response.text} with error {e}")
raise InvalidRequest("SES-SNS callback failed: attempt to raise_for_status()SubscriptionConfirmation Type message failed", 400)
current_app.logger.info("SES-SNS auto-confirm subscription callback succeeded")
return message
# TODO remove after smoke testing on prod is implemented
current_app.logger.info(f"SNS message: {message} is a valid message. Attempting to process it now.")
return message