fixing tests

This commit is contained in:
jimmoffet
2022-10-03 17:16:59 -07:00
parent c7ccc3b0dd
commit c04d1df6b3
14 changed files with 279 additions and 294 deletions

View File

@@ -1,5 +1,9 @@
from flask import current_app, json
import enum
from datetime import timedelta
from flask import Blueprint, current_app, json, jsonify, request
from app.celery.process_ses_receipts_tasks import process_ses_results
from app.celery.service_callback_tasks import (
create_complaint_callback_data,
create_delivery_status_callback_data,
@@ -13,127 +17,28 @@ from app.dao.service_callback_api_dao import (
get_service_complaint_callback_api_for_service,
get_service_delivery_status_callback_api_for_service,
)
from app.errors import InvalidRequest
from app.models import Complaint
from app.notifications.callbacks import create_complaint_callback_data
from app.notifications.sns_handlers import sns_notification_handler
ses_callback_blueprint = Blueprint('notifications_ses_callback', __name__)
DEFAULT_MAX_AGE = timedelta(days=10000)
def determine_notification_bounce_type(ses_message):
notification_type = ses_message["notificationType"]
if notification_type in ["Delivery", "Complaint"]:
return notification_type
if notification_type != "Bounce":
raise KeyError(f"Unhandled notification type {notification_type}")
remove_emails_from_bounce(ses_message)
current_app.logger.info("SES bounce dict: {}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")")))
if ses_message["bounce"]["bounceType"] == "Permanent":
return "Permanent"
return "Temporary"
def _determine_provider_response(ses_message):
if ses_message["notificationType"] != "Bounce":
return None
bounce_type = ses_message["bounce"]["bounceType"]
bounce_subtype = ses_message["bounce"]["bounceSubType"]
# See https://docs.aws.amazon.com/ses/latest/DeveloperGuide/event-publishing-retrieving-sns-contents.html
if bounce_type == "Permanent" and bounce_subtype == "Suppressed":
return "The email address is on our email provider suppression list"
elif bounce_type == "Permanent" and bounce_subtype == "OnAccountSuppressionList":
return "The email address is on the GC Notify suppression list"
elif bounce_type == "Transient" and bounce_subtype == "AttachmentRejected":
return "The email was rejected because of its attachments"
return None
def get_aws_responses(ses_message):
status = determine_notification_bounce_type(ses_message)
base = {
"Permanent": {
"message": "Hard bounced",
"success": False,
"notification_status": "permanent-failure",
},
"Temporary": {
"message": "Soft bounced",
"success": False,
"notification_status": "temporary-failure",
},
"Delivery": {
"message": "Delivered",
"success": True,
"notification_status": "delivered",
},
"Complaint": {
"message": "Complaint",
"success": True,
"notification_status": "delivered",
},
}[status]
base["provider_response"] = _determine_provider_response(ses_message)
return base
def handle_complaint(ses_message):
recipient_email = remove_emails_from_complaint(ses_message)[0]
current_app.logger.info("Complaint from SES: \n{}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")")))
# 400 counts as a permanent failure so SNS will not retry.
# 500 counts as a failed delivery attempt so SNS will retry.
# See https://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html#DeliveryPolicies
@ses_callback_blueprint.route('/notifications/email/ses', methods=['POST'])
def email_ses_callback_handler():
try:
reference = ses_message["mail"]["messageId"]
except KeyError as e:
current_app.logger.exception("Complaint from SES failed to get reference from message", e)
return
notification = dao_get_notification_history_by_reference(reference)
ses_complaint = ses_message.get("complaint", None)
data = sns_notification_handler(request.data, request.headers)
except Exception as e:
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
message = data.get("Message")
if "mail" in message:
process_ses_results.apply_async([{"Message": message}], queue=QueueNames.NOTIFY)
complaint = Complaint(
notification_id=notification.id,
service_id=notification.service_id,
ses_feedback_id=ses_complaint.get("feedbackId", None) if ses_complaint else None,
complaint_type=ses_complaint.get("complaintFeedbackType", None) if ses_complaint else None,
complaint_date=ses_complaint.get("timestamp", None) if ses_complaint else None,
)
save_complaint(complaint)
return complaint, notification, recipient_email
def remove_mail_headers(dict_to_edit):
if dict_to_edit["mail"].get("headers"):
dict_to_edit["mail"].pop("headers")
if dict_to_edit["mail"].get("commonHeaders"):
dict_to_edit["mail"].pop("commonHeaders")
def remove_emails_from_bounce(bounce_dict):
remove_mail_headers(bounce_dict)
bounce_dict["mail"].pop("destination", None)
bounce_dict["bounce"].pop("bouncedRecipients", None)
def remove_emails_from_complaint(complaint_dict):
remove_mail_headers(complaint_dict)
complaint_dict["complaint"].pop("complainedRecipients")
return complaint_dict["mail"].pop("destination")
def check_and_queue_callback_task(notification):
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS)
def _check_and_queue_complaint_callback_task(complaint, notification, recipient):
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
return jsonify(
result="success", message="SES-SNS callback succeeded"
), 200

View File

@@ -9,6 +9,7 @@ from app.errors import InvalidRequest, register_errors
sms_callback_blueprint = Blueprint("sms_callback", __name__, url_prefix="/notifications/sms")
register_errors(sms_callback_blueprint)
# TODO SNS SMS delivery receipts delivered here
# @sms_callback_blueprint.route('/mmg', methods=['POST'])
# def process_mmg_response():

View File

@@ -2,17 +2,17 @@ from datetime import datetime
from urllib.parse import unquote
import iso8601
from flask import Blueprint, abort, current_app, jsonify, request, json
from flask import Blueprint, abort, current_app, json, jsonify, request
from gds_metrics.metrics import Counter
from notifications_utils.recipients import try_validate_and_format_phone_number
from app.celery import tasks
from app.celery.validate_sns_message import sns_notification_handler
from app.config import QueueNames
from app.dao.inbound_sms_dao import dao_create_inbound_sms
from app.dao.services_dao import dao_fetch_service_by_inbound_number
from app.errors import register_errors, InvalidRequest
from app.errors import InvalidRequest, register_errors
from app.models import INBOUND_SMS_TYPE, SMS_TYPE, InboundSms
from app.notifications.sns_handlers import sns_notification_handler
receive_notifications_blueprint = Blueprint('receive_notifications', __name__)
register_errors(receive_notifications_blueprint)
@@ -83,90 +83,90 @@ def receive_sns_sms():
), 200
@receive_notifications_blueprint.route('/notifications/sms/receive/mmg', methods=['POST'])
def receive_mmg_sms():
"""
{
'MSISDN': '447123456789'
'Number': '40604',
'Message': 'some+uri+encoded+message%3A',
'ID': 'SOME-MMG-SPECIFIC-ID',
'DateRecieved': '2017-05-21+11%3A56%3A11'
}
"""
post_data = request.get_json()
# @receive_notifications_blueprint.route('/notifications/sms/receive/mmg', methods=['POST'])
# def receive_mmg_sms():
# """
# {
# 'MSISDN': '447123456789'
# 'Number': '40604',
# 'Message': 'some+uri+encoded+message%3A',
# 'ID': 'SOME-MMG-SPECIFIC-ID',
# 'DateRecieved': '2017-05-21+11%3A56%3A11'
# }
# """
# post_data = request.get_json()
auth = request.authorization
# auth = request.authorization
if not auth:
current_app.logger.warning("Inbound sms (MMG) no auth header")
abort(401)
elif auth.username not in current_app.config['MMG_INBOUND_SMS_USERNAME'] \
or auth.password not in current_app.config['MMG_INBOUND_SMS_AUTH']:
current_app.logger.warning("Inbound sms (MMG) incorrect username ({}) or password".format(auth.username))
abort(403)
# if not auth:
# current_app.logger.warning("Inbound sms (MMG) no auth header")
# abort(401)
# elif auth.username not in current_app.config['MMG_INBOUND_SMS_USERNAME'] \
# or auth.password not in current_app.config['MMG_INBOUND_SMS_AUTH']:
# current_app.logger.warning("Inbound sms (MMG) incorrect username ({}) or password".format(auth.username))
# abort(403)
inbound_number = strip_leading_forty_four(post_data['Number'])
# inbound_number = strip_leading_forty_four(post_data['Number'])
service = fetch_potential_service(inbound_number, 'mmg')
if not service:
# since this is an issue with our service <-> number mapping, or no inbound_sms service permission
# we should still tell MMG that we received it successfully
return 'RECEIVED', 200
# service = fetch_potential_service(inbound_number, 'mmg')
# if not service:
# # since this is an issue with our service <-> number mapping, or no inbound_sms service permission
# # we should still tell MMG that we received it successfully
# return 'RECEIVED', 200
INBOUND_SMS_COUNTER.labels("mmg").inc()
# INBOUND_SMS_COUNTER.labels("mmg").inc()
inbound = create_inbound_sms_object(service,
content=format_mmg_message(post_data["Message"]),
from_number=post_data['MSISDN'],
provider_ref=post_data["ID"],
date_received=post_data.get('DateRecieved'),
provider_name="mmg")
# inbound = create_inbound_sms_object(service,
# content=format_mmg_message(post_data["Message"]),
# from_number=post_data['MSISDN'],
# provider_ref=post_data["ID"],
# date_received=post_data.get('DateRecieved'),
# provider_name="mmg")
tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY)
# tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY)
current_app.logger.debug(
'{} received inbound SMS with reference {} from MMG'.format(service.id, inbound.provider_reference))
return jsonify({
"status": "ok"
}), 200
# current_app.logger.debug(
# '{} received inbound SMS with reference {} from MMG'.format(service.id, inbound.provider_reference))
# return jsonify({
# "status": "ok"
# }), 200
@receive_notifications_blueprint.route('/notifications/sms/receive/firetext', methods=['POST'])
def receive_firetext_sms():
post_data = request.form
# @receive_notifications_blueprint.route('/notifications/sms/receive/firetext', methods=['POST'])
# def receive_firetext_sms():
# post_data = request.form
auth = request.authorization
if not auth:
current_app.logger.warning("Inbound sms (Firetext) no auth header")
abort(401)
elif auth.username != 'notify' or auth.password not in current_app.config['FIRETEXT_INBOUND_SMS_AUTH']:
current_app.logger.warning("Inbound sms (Firetext) incorrect username ({}) or password".format(auth.username))
abort(403)
# auth = request.authorization
# if not auth:
# current_app.logger.warning("Inbound sms (Firetext) no auth header")
# abort(401)
# elif auth.username != 'notify' or auth.password not in current_app.config['FIRETEXT_INBOUND_SMS_AUTH']:
# current_app.logger.warning("Inbound sms (Firetext) incorrect username ({}) or password".format(auth.username))
# abort(403)
inbound_number = strip_leading_forty_four(post_data['destination'])
# inbound_number = strip_leading_forty_four(post_data['destination'])
service = fetch_potential_service(inbound_number, 'firetext')
if not service:
return jsonify({
"status": "ok"
}), 200
# service = fetch_potential_service(inbound_number, 'firetext')
# if not service:
# return jsonify({
# "status": "ok"
# }), 200
inbound = create_inbound_sms_object(service=service,
content=post_data["message"],
from_number=post_data['source'],
provider_ref=None,
date_received=post_data['time'],
provider_name="firetext")
# inbound = create_inbound_sms_object(service=service,
# content=post_data["message"],
# from_number=post_data['source'],
# provider_ref=None,
# date_received=post_data['time'],
# provider_name="firetext")
INBOUND_SMS_COUNTER.labels("firetext").inc()
# INBOUND_SMS_COUNTER.labels("firetext").inc()
tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY)
current_app.logger.debug(
'{} received inbound SMS with reference {} from Firetext'.format(service.id, inbound.provider_reference))
return jsonify({
"status": "ok"
}), 200
# tasks.send_inbound_sms_to_service.apply_async([str(inbound.id), str(service.id)], queue=QueueNames.NOTIFY)
# current_app.logger.debug(
# '{} received inbound SMS with reference {} from Firetext'.format(service.id, inbound.provider_reference))
# return jsonify({
# "status": "ok"
# }), 200
def format_mmg_message(message):

View File

@@ -0,0 +1,113 @@
import base64
import re
from urllib.parse import urlparse
import oscrypto.asymmetric
import oscrypto.errors
import requests
import six
from app import redis_store
from app.config import Config
USE_CACHE = True
VALIDATE_ARN = True
VALID_SNS_TOPICS = Config.VALID_SNS_TOPICS
_signing_cert_cache = {}
_cert_url_re = re.compile(
r'sns\.([a-z]{1,3}-[a-z]+-[0-9]{1,2})\.amazonaws\.com',
)
class ValidationError(Exception):
"""
ValidationError. Raised when a message fails integrity checks.
"""
def get_certificate(url):
if USE_CACHE:
res = redis_store.get(url)
if res is not None:
return res
res = requests.get(url).text
redis_store.set(url, res, ex=60 * 60) # 60 minutes
return res
else:
return requests.get(url).text
def validate_arn(sns_payload):
if VALIDATE_ARN:
arn = sns_payload.get('TopicArn')
topic_name = arn.split(':')[5]
if topic_name not in VALID_SNS_TOPICS:
raise ValidationError("Invalid Topic Name")
def get_string_to_sign(sns_payload):
payload_type = sns_payload.get('Type')
if payload_type in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']:
fields = ['Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type']
elif payload_type == 'Notification':
fields = ['Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type']
else:
raise ValidationError("Unexpected Message Type")
string_to_sign = ''
for field in fields:
field_value = sns_payload.get(field)
if not isinstance(field_value, str):
if field == 'Subject' and field_value == None:
continue
raise ValidationError(f"In {field}, found non-string value: {field_value}")
string_to_sign += field + '\n' + field_value + '\n'
if isinstance(string_to_sign, six.text_type):
string_to_sign = string_to_sign.encode()
return string_to_sign
def validate_sns_cert(sns_payload):
"""
Adapted from the solution posted at
https://github.com/boto/boto3/issues/2508#issuecomment-992931814
Modified to swap m2crypto for oscrypto
"""
if not isinstance(sns_payload, dict):
raise ValidationError("Unexpected message type {!r}".format(type(sns_payload).__name__))
# Amazon SNS currently supports signature version 1.
if sns_payload.get('SignatureVersion') != '1':
raise ValidationError("Wrong Signature Version (expected 1)")
validate_arn(sns_payload)
string_to_sign = get_string_to_sign(sns_payload)
# Key signing cert url via Lambda and via webhook are slightly different
signing_cert_url = sns_payload.get('SigningCertUrl') if 'SigningCertUrl' in sns_payload else sns_payload.get('SigningCertURL')
if not isinstance(signing_cert_url, str):
raise ValidationError("Signing cert url must be a string")
cert_scheme, cert_netloc, *_ = urlparse(signing_cert_url)
if cert_scheme != 'https' or not re.match(_cert_url_re, cert_netloc):
raise ValidationError("Cert does not appear to be from AWS")
certificate = _signing_cert_cache.get(signing_cert_url)
if certificate is None:
certificate = get_certificate(signing_cert_url)
if isinstance(certificate, six.text_type):
certificate = certificate.encode()
signature = base64.b64decode(sns_payload["Signature"])
try:
oscrypto.asymmetric.rsa_pkcs1v15_verify(
oscrypto.asymmetric.load_certificate(certificate),
signature,
string_to_sign,
"sha1"
)
return True
except oscrypto.errors.SignatureError:
raise ValidationError("Invalid signature")

View File

@@ -0,0 +1,66 @@
import enum
from datetime import timedelta
from json import decoder
import requests
from flask import current_app, json
from app.errors import InvalidRequest
from app.notifications.sns_cert_validator import validate_sns_cert
DEFAULT_MAX_AGE = timedelta(days=10000)
class SNSMessageType(enum.Enum):
SubscriptionConfirmation = 'SubscriptionConfirmation'
Notification = 'Notification'
UnsubscribeConfirmation = 'UnsubscribeConfirmation'
class InvalidMessageTypeException(Exception):
pass
def verify_message_type(message_type: str):
try:
SNSMessageType(message_type)
except ValueError:
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
def sns_notification_handler(data, headers):
message_type = headers.get('x-amz-sns-message-type')
try:
verify_message_type(message_type)
except InvalidMessageTypeException:
current_app.logger.exception(f"Response headers: {headers}\nResponse data: {data}")
raise InvalidRequest("SES-SNS callback failed: invalid message type", 400)
try:
message = json.loads(data.decode('utf-8'))
except decoder.JSONDecodeError:
current_app.logger.exception(f"Response headers: {headers}\nResponse data: {data}")
raise InvalidRequest("SES-SNS callback failed: invalid JSON given", 400)
try:
validate_sns_cert(message)
except Exception as e:
current_app.logger.error(f"SES-SNS callback failed: validation failed with error: Signature validation failed with error {e}")
raise InvalidRequest("SES-SNS callback failed: validation failed", 400)
if message.get('Type') == 'SubscriptionConfirmation':
# NOTE once a request is sent to SubscribeURL, AWS considers Notify a confirmed subscriber to this topic
url = message.get('SubscribeUrl') if 'SubscribeUrl' in message else message.get('SubscribeURL')
response = requests.get(url)
try:
response.raise_for_status()
except Exception as e:
current_app.logger.warning(f"Attempt to raise_for_status()SubscriptionConfirmation Type message files for response: {response.text} with error {e}")
raise InvalidRequest("SES-SNS callback failed: attempt to raise_for_status()SubscriptionConfirmation Type message failed", 400)
current_app.logger.info("SES-SNS auto-confirm subscription callback succeeded")
return message
# TODO remove after smoke testing on prod is implemented
current_app.logger.info(f"SNS message: {message} is a valid message. Attempting to process it now.")
return message