diff --git a/app/__init__.py b/app/__init__.py index 2fffe2673..035ce112a 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -139,6 +139,7 @@ def register_blueprint(application): ) from app.billing.rest import billing_blueprint from app.broadcast_message.rest import broadcast_message_blueprint + from app.celery.process_ses_receipts_tasks import ses_callback_blueprint from app.complaint.complaint_rest import complaint_blueprint from app.email_branding.rest import email_branding_blueprint from app.events.rest import events as events_blueprint @@ -196,6 +197,10 @@ def register_blueprint(application): status_blueprint.before_request(requires_no_auth) application.register_blueprint(status_blueprint) + + # delivery receipts + ses_callback_blueprint.before_request(requires_no_auth) + application.register_blueprint(ses_callback_blueprint) # delivery receipts # TODO: make sure research mode can still trigger sms callbacks, then re-enable this diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index 04a8d14f1..bfa21ad89 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -1,80 +1,159 @@ +import enum from datetime import datetime, timedelta +from json import decoder import iso8601 +import requests +import validatesns from celery.exceptions import Retry -from flask import current_app, json +from flask import Blueprint, current_app, json, jsonify, request +from notifications_utils.statsd_decorators import statsd from sqlalchemy.orm.exc import NoResultFound -from app import notify_celery, statsd_client -from app.clients.email.aws_ses import get_aws_responses +from app import notify_celery, redis_store, statsd_client from app.config import QueueNames from app.dao import notifications_dao +from app.errors import InvalidRequest, register_errors from app.models import NOTIFICATION_PENDING, NOTIFICATION_SENDING from app.notifications.notifications_ses_callback import ( _check_and_queue_complaint_callback_task, + _determine_notification_bounce_type, check_and_queue_callback_task, - determine_notification_bounce_type, + get_aws_responses, handle_complaint, ) +ses_callback_blueprint = Blueprint('notifications_ses_callback', __name__) + +register_errors(ses_callback_blueprint) + +class SNSMessageType(enum.Enum): + SubscriptionConfirmation = 'SubscriptionConfirmation' + Notification = 'Notification' + UnsubscribeConfirmation = 'UnsubscribeConfirmation' + + +class InvalidMessageTypeException(Exception): + pass + + +def verify_message_type(message_type: str): + try: + SNSMessageType(message_type) + except ValueError: + raise InvalidMessageTypeException(f'{message_type} is not a valid message type.') + + +def get_certificate(url): + res = redis_store.get(url) + if res is not None: + return res + res = requests.get(url).content + redis_store.set(url, res, ex=60 * 60) # 60 minutes + return res + +# 400 counts as a permanent failure so SNS will not retry. +# 500 counts as a failed delivery attempt so SNS will retry. +# See https://docs.aws.amazon.com/sns/latest/dg/DeliveryPolicies.html#DeliveryPolicies +# This should not be here, it used to be in notifications/notifications_ses_callback. It then +# got refactored into a task, which is fine, but it created a circular dependency. Will need +# to investigate why GDS extracted this into a lambda +@ses_callback_blueprint.route('/notifications/email/ses', methods=['POST']) +def sns_callback_handler(): + message_type = request.headers.get('x-amz-sns-message-type') + try: + verify_message_type(message_type) + except InvalidMessageTypeException: + raise InvalidRequest("SES-SNS callback failed: invalid message type", 400) + + try: + message = json.loads(request.data) + except decoder.JSONDecodeError: + raise InvalidRequest("SES-SNS callback failed: invalid JSON given", 400) + + try: + validatesns.validate(message, get_certificate=get_certificate) + except validatesns.ValidationError: + raise InvalidRequest("SES-SNS callback failed: validation failed", 400) + + if message.get('Type') == 'SubscriptionConfirmation': + url = message.get('SubscribeURL') + response = requests.get(url) + try: + response.raise_for_status() + except Exception as e: + current_app.logger.warning("Response: {}".format(response.text)) + raise e + + return jsonify( + result="success", message="SES-SNS auto-confirm callback succeeded" + ), 200 + + process_ses_results.apply_async([{"Message": message.get("Message")}], queue=QueueNames.NOTIFY) + + return jsonify( + result="success", message="SES-SNS callback succeeded" + ), 200 + @notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300) +@statsd(namespace="tasks") def process_ses_results(self, response): try: - ses_message = json.loads(response['Message']) - notification_type = ses_message['notificationType'] - bounce_message = None - - if notification_type == 'Bounce': - notification_type, bounce_message = determine_notification_bounce_type(notification_type, ses_message) - elif notification_type == 'Complaint': + ses_message = json.loads(response["Message"]) + notification_type = ses_message["notificationType"] + print(f"ses_message is: {ses_message}") + if notification_type == "Complaint": _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) return True - aws_response_dict = get_aws_responses(notification_type) + aws_response_dict = get_aws_responses(ses_message) - notification_status = aws_response_dict['notification_status'] - reference = ses_message['mail']['messageId'] + notification_status = aws_response_dict["notification_status"] + reference = ses_message["mail"]["messageId"] + + print(f"notification_status is: {notification_status}") try: - notification = notifications_dao.dao_get_notification_or_history_by_reference(reference=reference) + notification = notifications_dao.dao_get_notification_by_reference(reference) except NoResultFound: - message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None) + message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None) if datetime.utcnow() - message_time < timedelta(minutes=5): - current_app.logger.info( - f"notification not found for reference: {reference} (update to {notification_status}). " - f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue" - ) self.retry(queue=QueueNames.RETRY) else: current_app.logger.warning( - f"notification not found for reference: {reference} (update to {notification_status})" + "notification not found for reference: {} (update to {})".format(reference, notification_status) ) return - if bounce_message: - current_app.logger.info(f"SES bounce for notification ID {notification.id}: {bounce_message}") - - if notification.status not in [NOTIFICATION_SENDING, NOTIFICATION_PENDING]: + if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: notifications_dao._duplicate_update_warning( - notification=notification, - status=notification_status + notification, + notification_status ) return + + notifications_dao._update_notification_status( + notification=notification, + status=notification_status, + provider_response=aws_response_dict["provider_response"], + ) + + if not aws_response_dict["success"]: + current_app.logger.info( + "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( + notification.id, reference, aws_response_dict["message"] + ) + ) else: - notifications_dao.dao_update_notifications_by_reference( - references=[reference], - update_dict={'status': notification_status} + current_app.logger.info( + "SES callback return status of {} for notification: {}".format(notification_status, notification.id) ) - statsd_client.incr('callback.ses.{}'.format(notification_status)) + statsd_client.incr("callback.ses.{}".format(notification_status)) if notification.sent_at: - statsd_client.timing_with_dates( - f'callback.ses.{notification_status}.elapsed-time', - datetime.utcnow(), - notification.sent_at - ) + statsd_client.timing_with_dates("callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at) check_and_queue_callback_task(notification) @@ -84,5 +163,74 @@ def process_ses_results(self, response): raise except Exception as e: - current_app.logger.exception('Error processing SES results: {}'.format(type(e))) + current_app.logger.exception("Error processing SES results: {}".format(type(e))) self.retry(queue=QueueNames.RETRY) + +# def process_ses_results(self, response): +# try: +# ses_message = json.loads(response['Message']) +# print(f"ses_message is {ses_message}") +# notification_type = ses_message['notificationType'] +# print(f"notification_type is {notification_type}") +# if notification_type == 'Bounce': +# notification_type = _determine_notification_bounce_type(ses_message) +# elif notification_type == 'Complaint': +# _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) +# return True +# aws_response_dict = get_aws_responses(notification_type) +# print(f"aws_response_dict is {aws_response_dict}") +# notification_status = aws_response_dict['notification_status'] +# print(f"notification_status is {notification_status}") +# reference = ses_message['mail']['messageId'] +# try: +# notification = notifications_dao.dao_get_notification_by_reference(reference) +# print(f"notification is {notification}") +# except NoResultFound: +# print(f"notification not found") +# message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None) +# if datetime.utcnow() - message_time < timedelta(minutes=5): +# self.retry(queue=QueueNames.RETRY) +# else: +# current_app.logger.warning( +# "notification not found for reference: {} (update to {})".format(reference, notification_status) +# ) +# return +# print(f"notification.status is {notification.status}") +# if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: +# print(f"notification.status is not in [{NOTIFICATION_SENDING}, {NOTIFICATION_PENDING}]") +# notifications_dao._duplicate_update_warning(notification, notification_status) +# return +# notifications_dao._update_notification_status( +# notification=notification, +# status=notification_status, +# provider_response=None +# ) +# if not aws_response_dict['success']: +# current_app.logger.info( +# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( +# notification.id, reference, aws_response_dict['message'] +# ) +# ) +# print( +# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( +# notification.id, reference, aws_response_dict['message'] +# ) +# ) +# else: +# current_app.logger.info('SES callback return status of {} for notification: {}'.format( +# notification_status, notification.id +# )) +# print('SES callback return status of {} for notification: {}'.format( +# notification_status, notification.id +# )) +# statsd_client.incr('callback.ses.{}'.format(notification_status)) +# if notification.sent_at: +# statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at) +# check_and_queue_callback_task(notification) +# return True +# except Retry: +# raise +# except Exception as e: +# current_app.logger.exception('Error processing SES results: {}'.format(type(e))) +# self.retry(queue=QueueNames.RETRY) + \ No newline at end of file diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 4942de527..e5698a3b5 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -87,16 +87,21 @@ def country_records_delivery(phone_prefix): dlr = INTERNATIONAL_BILLING_RATES[phone_prefix]['attributes']['dlr'] return dlr and dlr.lower() == 'yes' +def _decide_permanent_temporary_failure(current_status, status): + # If we go from pending to delivered we need to set failure type as temporary-failure + if current_status == NOTIFICATION_PENDING and status == NOTIFICATION_PERMANENT_FAILURE: + status = NOTIFICATION_TEMPORARY_FAILURE + return status -def _update_notification_status(notification, status, detailed_status_code=None): - # status = _decide_permanent_temporary_failure( - # status=status, notification=notification, detailed_status_code=detailed_status_code - # ) - # notification.status = status - # dao_update_notification(notification) + +def _update_notification_status(notification, status, provider_response=None): + status = _decide_permanent_temporary_failure(current_status=notification.status, status=status) + notification.status = status + if provider_response: + notification.provider_response = provider_response + dao_update_notification(notification) return notification - @autocommit def update_notification_status_by_id(notification_id, status, sent_by=None, detailed_status_code=None): notification = Notification.query.with_for_update().filter(Notification.id == notification_id).first() @@ -599,6 +604,14 @@ def dao_get_notification_or_history_by_reference(reference): NotificationHistory.reference == reference ).one() +def dao_get_notification_history_by_reference(reference): + try: + # This try except is necessary because in test keys and research mode does not create notification history. + # Otherwise we could just search for the NotificationHistory object + return Notification.query.filter(Notification.reference == reference).one() + except NoResultFound: + return NotificationHistory.query.filter(NotificationHistory.reference == reference).one() + def dao_get_notifications_processing_time_stats(start_date, end_date): """ diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index d546cbc0c..48c634e76 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -163,7 +163,10 @@ def update_notification_to_sending(notification, provider): notification.sent_at = datetime.utcnow() notification.sent_by = provider.name if notification.status not in NOTIFICATION_STATUS_TYPES_COMPLETED: - notification.status = NOTIFICATION_SENT if notification.international else NOTIFICATION_SENDING + # We currently have no callback method for SNS + # TODO create celery task to request delivery receipts from cloudwatch api + notification.status = NOTIFICATION_SENT if notification.notification_type == "sms" else NOTIFICATION_SENDING + dao_update_notification(notification) diff --git a/app/models.py b/app/models.py index ff6a8a8d9..a69188531 100644 --- a/app/models.py +++ b/app/models.py @@ -1505,6 +1505,8 @@ class Notification(db.Model): document_download_count = db.Column(db.Integer, nullable=True) postage = db.Column(db.String, nullable=True) + provider_response = db.Column(db.Text, nullable=True) + # queue_name = db.Column(db.Text, nullable=True) __table_args__ = ( db.ForeignKeyConstraint( @@ -1707,6 +1709,7 @@ class Notification(db.Model): "postcode": None, "type": self.notification_type, "status": self.get_letter_status() if self.notification_type == LETTER_TYPE else self.status, + "provider_response": self.provider_response, "template": template_dict, "body": self.content, "subject": self.subject, diff --git a/app/notifications/callbacks.py b/app/notifications/callbacks.py new file mode 100644 index 000000000..7c3f1eed6 --- /dev/null +++ b/app/notifications/callbacks.py @@ -0,0 +1,52 @@ +from app.celery.service_callback_tasks import send_delivery_status_to_service +from app.config import QueueNames +from app.dao.service_callback_api_dao import ( + get_service_delivery_status_callback_api_for_service, +) + + +def check_and_queue_callback_task(notification): + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) + if service_callback_api: + notification_data = create_delivery_status_callback_data(notification, service_callback_api) + + send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) + + +def create_delivery_status_callback_data(notification, service_callback_api): + from app import encryption + from app.utils import DATETIME_FORMAT + + data = { + "notification_id": str(notification.id), + "notification_client_reference": notification.client_reference, + "notification_to": notification.to, + "notification_status": notification.status, + "notification_provider_response": notification.provider_response, + "notification_created_at": notification.created_at.strftime(DATETIME_FORMAT), + "notification_updated_at": notification.updated_at.strftime(DATETIME_FORMAT) if notification.updated_at else None, + "notification_sent_at": notification.sent_at.strftime(DATETIME_FORMAT) if notification.sent_at else None, + "notification_type": notification.notification_type, + "service_callback_api_url": service_callback_api.url, + "service_callback_api_bearer_token": service_callback_api.bearer_token, + } + + return encryption.encrypt(data) + + +def create_complaint_callback_data(complaint, notification, service_callback_api, recipient): + from app import encryption + from app.utils import DATETIME_FORMAT + + data = { + "complaint_id": str(complaint.id), + "notification_id": str(notification.id), + "reference": notification.client_reference, + "to": recipient, + "complaint_date": complaint.complaint_date.strftime(DATETIME_FORMAT), + "service_callback_api_url": service_callback_api.url, + "service_callback_api_bearer_token": service_callback_api.bearer_token, + } + + return encryption.encrypt(data) \ No newline at end of file diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index c1fdceaa0..1de6aeed6 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -1,4 +1,4 @@ -from flask import current_app +from flask import current_app, json from app.celery.service_callback_tasks import ( create_complaint_callback_data, @@ -8,65 +8,125 @@ from app.celery.service_callback_tasks import ( ) from app.config import QueueNames from app.dao.complaint_dao import save_complaint -from app.dao.notifications_dao import ( - dao_get_notification_or_history_by_reference, -) +from app.dao.notifications_dao import dao_get_notification_history_by_reference from app.dao.service_callback_api_dao import ( get_service_complaint_callback_api_for_service, get_service_delivery_status_callback_api_for_service, ) from app.models import Complaint +from app.notifications.callbacks import create_complaint_callback_data -def determine_notification_bounce_type(notification_type, ses_message): +def _determine_notification_bounce_type(ses_message): + notification_type = ses_message["notificationType"] + if notification_type in ["Delivery", "Complaint"]: + return notification_type + + if notification_type != "Bounce": + raise KeyError(f"Unhandled notification type {notification_type}") + remove_emails_from_bounce(ses_message) - if ses_message['bounce']['bounceType'] == 'Permanent': - notification_type = ses_message['bounce']['bounceType'] # permanent or not - else: - notification_type = 'Temporary' - return notification_type, ses_message + current_app.logger.info("SES bounce dict: {}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")"))) + if ses_message["bounce"]["bounceType"] == "Permanent": + return "Permanent" + return "Temporary" + + +def _determine_provider_response(ses_message): + if ses_message["notificationType"] != "Bounce": + return None + + bounce_type = ses_message["bounce"]["bounceType"] + bounce_subtype = ses_message["bounce"]["bounceSubType"] + + # See https://docs.aws.amazon.com/ses/latest/DeveloperGuide/event-publishing-retrieving-sns-contents.html + if bounce_type == "Permanent" and bounce_subtype == "Suppressed": + return "The email address is on our email provider suppression list" + elif bounce_type == "Permanent" and bounce_subtype == "OnAccountSuppressionList": + return "The email address is on the GC Notify suppression list" + elif bounce_type == "Transient" and bounce_subtype == "AttachmentRejected": + return "The email was rejected because of its attachments" + + return None + + +def get_aws_responses(ses_message): + status = _determine_notification_bounce_type(ses_message) + + base = { + "Permanent": { + "message": "Hard bounced", + "success": False, + "notification_status": "permanent-failure", + }, + "Temporary": { + "message": "Soft bounced", + "success": False, + "notification_status": "temporary-failure", + }, + "Delivery": { + "message": "Delivered", + "success": True, + "notification_status": "delivered", + }, + "Complaint": { + "message": "Complaint", + "success": True, + "notification_status": "delivered", + }, + }[status] + + base["provider_response"] = _determine_provider_response(ses_message) + + return base def handle_complaint(ses_message): recipient_email = remove_emails_from_complaint(ses_message)[0] - current_app.logger.info("Complaint from SES: \n{}".format(ses_message)) + current_app.logger.info("Complaint from SES: \n{}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")"))) try: - reference = ses_message['mail']['messageId'] + reference = ses_message["mail"]["messageId"] except KeyError as e: current_app.logger.exception("Complaint from SES failed to get reference from message", e) return - notification = dao_get_notification_or_history_by_reference(reference) - ses_complaint = ses_message.get('complaint', None) + notification = dao_get_notification_history_by_reference(reference) + ses_complaint = ses_message.get("complaint", None) complaint = Complaint( notification_id=notification.id, service_id=notification.service_id, - ses_feedback_id=ses_complaint.get('feedbackId', None) if ses_complaint else None, - complaint_type=ses_complaint.get('complaintFeedbackType', None) if ses_complaint else None, - complaint_date=ses_complaint.get('timestamp', None) if ses_complaint else None + ses_feedback_id=ses_complaint.get("feedbackId", None) if ses_complaint else None, + complaint_type=ses_complaint.get("complaintFeedbackType", None) if ses_complaint else None, + complaint_date=ses_complaint.get("timestamp", None) if ses_complaint else None, ) save_complaint(complaint) return complaint, notification, recipient_email def remove_mail_headers(dict_to_edit): - if dict_to_edit['mail'].get('headers'): - dict_to_edit['mail'].pop('headers') - if dict_to_edit['mail'].get('commonHeaders'): - dict_to_edit['mail'].pop('commonHeaders') + if dict_to_edit["mail"].get("headers"): + dict_to_edit["mail"].pop("headers") + if dict_to_edit["mail"].get("commonHeaders"): + dict_to_edit["mail"].pop("commonHeaders") def remove_emails_from_bounce(bounce_dict): remove_mail_headers(bounce_dict) - bounce_dict['mail'].pop('destination') - bounce_dict['bounce'].pop('bouncedRecipients') + bounce_dict["mail"].pop("destination") + bounce_dict["bounce"].pop("bouncedRecipients") def remove_emails_from_complaint(complaint_dict): remove_mail_headers(complaint_dict) - complaint_dict['complaint'].pop('complainedRecipients') - return complaint_dict['mail'].pop('destination') + complaint_dict["complaint"].pop("complainedRecipients") + return complaint_dict["mail"].pop("destination") +def check_and_queue_callback_task(notification): + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) + if service_callback_api: + notification_data = create_delivery_status_callback_data(notification, service_callback_api) + send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) def check_and_queue_callback_task(notification): # queue callback task only if the service_callback_api exists @@ -76,10 +136,9 @@ def check_and_queue_callback_task(notification): send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) - def _check_and_queue_complaint_callback_task(complaint, notification, recipient): # queue callback task only if the service_callback_api exists service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id) if service_callback_api: complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient) - send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) + send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) \ No newline at end of file diff --git a/migrations/versions/0376_add_provider_response.py b/migrations/versions/0376_add_provider_response.py new file mode 100644 index 000000000..20dc83273 --- /dev/null +++ b/migrations/versions/0376_add_provider_response.py @@ -0,0 +1,30 @@ +"""empty message + +Revision ID: 0376_add_provider_response +Revises: 0375_fix_service_name +Create Date: 2022-09-14 11:04:15.888017 + +""" +# revision identifiers, used by Alembic. +from datetime import datetime + +revision = '0376_add_provider_response' +down_revision = '0375_fix_service_name' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.add_column('notifications', sa.Column('provider_response', sa.Text(), nullable=True)) + op.add_column('notifications', sa.Column('queue_name', sa.Text(), nullable=True)) + ### end Alembic commands ### + + +def downgrade(): + ### commands auto generated by Alembic - please adjust! ### + op.drop_column('notifications', 'provider_response') + op.drop_column('notifications', 'queue_name') + ### end Alembic commands ### + \ No newline at end of file diff --git a/requirements.in b/requirements.in index de8a98254..fec2b41c6 100644 --- a/requirements.in +++ b/requirements.in @@ -19,6 +19,7 @@ marshmallow==3.15.0 psycopg2-binary==2.9.3 PyJWT==2.4.0 SQLAlchemy==1.4.40 +validatesns==0.1.1 cachetools==5.1.0 beautifulsoup4==4.11.1 lxml==4.9.1 diff --git a/requirements.txt b/requirements.txt index 154c1c908..2cdc474ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,8 @@ amqp==5.1.1 # via kombu arrow==1.2.2 # via isoduration +asn1crypto==1.5.1 + # via oscrypto async-timeout==4.0.2 # via redis attrs==21.4.0 @@ -169,6 +171,8 @@ notifications-utils @ git+https://github.com/GSA/notifications-utils.git # via -r requirements.in orderedset==2.0.3 # via notifications-utils +oscrypto==1.3.0 + # via validatesns packaging==21.3 # via # bleach @@ -247,6 +251,7 @@ six==1.16.0 # flask-marshmallow # python-dateutil # rfc3339-validator + # validatesns smartypants==2.0.1 # via notifications-utils soupsieve==2.3.2.post1 @@ -267,6 +272,8 @@ urllib3==1.26.9 # via # botocore # requests +validatesns==0.1.1 + # via -r requirements.in vine==5.0.0 # via # amqp diff --git a/tests/app/celery/test_process_ses_receipts_tasks.py b/tests/app/celery/test_process_ses_receipts_tasks.py index 4cb614ae8..51fda72aa 100644 --- a/tests/app/celery/test_process_ses_receipts_tasks.py +++ b/tests/app/celery/test_process_ses_receipts_tasks.py @@ -10,12 +10,16 @@ from app.celery.research_mode_tasks import ( ses_notification_callback, ses_soft_bounce_callback, ) +from app.celery.service_callback_tasks import ( + create_delivery_status_callback_data, +) from app.dao.notifications_dao import get_notification_by_id from app.models import Complaint, Notification from app.notifications.notifications_ses_callback import ( remove_emails_from_bounce, remove_emails_from_complaint, ) +from tests.app.conftest import create_sample_notification from tests.app.db import ( create_notification, create_service_callback_api, @@ -23,16 +27,87 @@ from tests.app.db import ( ) +def test_notifications_ses_400_with_invalid_header(client): + data = json.dumps({"foo": "bar"}) + response = client.post( + path='/notifications/email/ses', + data=data, + headers=[('Content-Type', 'application/json')] + ) + assert response.status_code == 400 + + +def test_notifications_ses_400_with_invalid_message_type(client): + data = json.dumps({"foo": "bar"}) + response = client.post( + path='/notifications/email/ses', + data=data, + headers=[('Content-Type', 'application/json'), ('x-amz-sns-message-type', 'foo')] + ) + assert response.status_code == 400 + assert "SES-SNS callback failed: invalid message type" in response.get_data(as_text=True) + + +def test_notifications_ses_400_with_invalid_json(client): + data = "FOOO" + response = client.post( + path='/notifications/email/ses', + data=data, + headers=[('Content-Type', 'application/json'), ('x-amz-sns-message-type', 'Notification')] + ) + assert response.status_code == 400 + assert "SES-SNS callback failed: invalid JSON given" in response.get_data(as_text=True) + + +def test_notifications_ses_400_with_certificate(client): + data = json.dumps({"foo": "bar"}) + response = client.post( + path='/notifications/email/ses', + data=data, + headers=[('Content-Type', 'application/json'), ('x-amz-sns-message-type', 'Notification')] + ) + assert response.status_code == 400 + assert "SES-SNS callback failed: validation failed" in response.get_data(as_text=True) + + +def test_notifications_ses_200_autoconfirms_subscription(client, mocker): + mocker.patch("validatesns.validate") + requests_mock = mocker.patch("requests.get") + data = json.dumps({"Type": "SubscriptionConfirmation", "SubscribeURL": "https://foo"}) + response = client.post( + path='/notifications/email/ses', + data=data, + headers=[('Content-Type', 'application/json'), ('x-amz-sns-message-type', 'SubscriptionConfirmation')] + ) + + requests_mock.assert_called_once_with("https://foo") + assert response.status_code == 200 + + +def test_notifications_ses_200_call_process_task(client, mocker): + mocker.patch("validatesns.validate") + process_mock = mocker.patch("app.celery.process_ses_receipts_tasks.process_ses_results.apply_async") + data = {"Type": "Notification", "foo": "bar"} + json_data = json.dumps(data) + response = client.post( + path='/notifications/email/ses', + data=json_data, + headers=[('Content-Type', 'application/json'), ('x-amz-sns-message-type', 'Notification')] + ) + + process_mock.assert_called_once_with([{'Message': None}], queue='notify-internal-tasks') + assert response.status_code == 200 + + def test_process_ses_results(sample_email_template): create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') assert process_ses_results(response=ses_notification_callback(reference='ref1')) -def test_process_ses_results_retry_called(sample_email_template, mocker): +def test_process_ses_results_retry_called(sample_email_template, _notify_db, mocker): create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') - - mocker.patch("app.dao.notifications_dao.dao_update_notifications_by_reference", side_effect=Exception("EXPECTED")) + mocker.patch("app.dao.notifications_dao._update_notification_status", side_effect=Exception("EXPECTED")) mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') process_ses_results(response=ses_notification_callback(reference='ref1')) assert mocked.call_count != 0 @@ -62,6 +137,7 @@ def test_remove_email_from_bounce(): def test_ses_callback_should_update_notification_status( client, + _notify_db, notify_db_session, sample_email_template, mocker): @@ -69,140 +145,159 @@ def test_ses_callback_should_update_notification_status( mocker.patch('app.statsd_client.incr') mocker.patch('app.statsd_client.timing_with_dates') send_mock = mocker.patch( - 'app.celery.process_ses_receipts_tasks.check_and_queue_callback_task' + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' ) - notification = create_notification( + notification = create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref', + status='sending', + sent_at=datetime.utcnow() ) + callback_api = create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") assert get_notification_by_id(notification.id).status == 'sending' - assert process_ses_results(ses_notification_callback(reference='ref')) assert get_notification_by_id(notification.id).status == 'delivered' statsd_client.timing_with_dates.assert_any_call( - "callback.ses.delivered.elapsed-time", datetime.utcnow(), notification.sent_at + "callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at ) statsd_client.incr.assert_any_call("callback.ses.delivered") updated_notification = Notification.query.get(notification.id) - send_mock.assert_called_once_with(updated_notification) + encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) + send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") def test_ses_callback_should_not_update_notification_status_if_already_delivered(sample_email_template, mocker): mock_dup = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._duplicate_update_warning') - mock_upd = mocker.patch( - 'app.celery.process_ses_receipts_tasks.notifications_dao.dao_update_notifications_by_reference' - ) + mock_upd = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._update_notification_status') notification = create_notification(template=sample_email_template, reference='ref', status='delivered') - assert process_ses_results(ses_notification_callback(reference='ref')) is None assert get_notification_by_id(notification.id).status == 'delivered' - - mock_dup.assert_called_once_with(notification=notification, status='delivered') + mock_dup.assert_called_once_with(notification, 'delivered') assert mock_upd.call_count == 0 -def test_ses_callback_should_retry_if_notification_is_new(client, notify_db_session, mocker): +def test_ses_callback_should_retry_if_notification_is_new(client, _notify_db, mocker): mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') - with freeze_time('2017-11-17T12:14:03.646Z'): assert process_ses_results(ses_notification_callback(reference='ref')) is None assert mock_logger.call_count == 0 assert mock_retry.call_count == 1 - - -def test_ses_callback_should_log_if_notification_is_missing(client, notify_db_session, mocker): +def test_ses_callback_should_log_if_notification_is_missing(client, _notify_db, mocker): mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.warning') - with freeze_time('2017-11-17T12:34:03.646Z'): assert process_ses_results(ses_notification_callback(reference='ref')) is None assert mock_retry.call_count == 0 mock_logger.assert_called_once_with('notification not found for reference: ref (update to delivered)') - - -def test_ses_callback_should_not_retry_if_notification_is_old(client, notify_db_session, mocker): +def test_ses_callback_should_not_retry_if_notification_is_old(client, _notify_db, mocker): mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') - with freeze_time('2017-11-21T12:14:03.646Z'): assert process_ses_results(ses_notification_callback(reference='ref')) is None assert mock_logger.call_count == 0 assert mock_retry.call_count == 0 - - -def test_ses_callback_should_update_multiple_notification_status_sent( +def test_ses_callback_does_not_call_send_delivery_status_if_no_db_entry( client, + _notify_db, + notify_db_session, + sample_email_template, + mocker): + with freeze_time('2001-01-01T12:00:00'): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + _notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + assert get_notification_by_id(notification.id).status == 'sending' + assert process_ses_results(ses_notification_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'delivered' + send_mock.assert_not_called() +def test_ses_callback_should_update_multiple_notification_status_sent( + client, + _notify_db, notify_db_session, sample_email_template, mocker): - send_mock = mocker.patch( - 'app.celery.process_ses_receipts_tasks.check_and_queue_callback_task' + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' ) - create_notification( + create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref1', - ) - create_notification( + sent_at=datetime.utcnow(), + status='sending') + create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref2', - ) - create_notification( + sent_at=datetime.utcnow(), + status='sending') + create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref3', - ) + sent_at=datetime.utcnow(), + status='sending') + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") assert process_ses_results(ses_notification_callback(reference='ref1')) assert process_ses_results(ses_notification_callback(reference='ref2')) assert process_ses_results(ses_notification_callback(reference='ref3')) assert send_mock.called - - def test_ses_callback_should_set_status_to_temporary_failure(client, + _notify_db, notify_db_session, sample_email_template, mocker): send_mock = mocker.patch( - 'app.celery.process_ses_receipts_tasks.check_and_queue_callback_task' + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' ) - mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.info') - notification = create_notification( + notification = create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref', + status='sending', + sent_at=datetime.utcnow() ) + create_service_callback_api(service=notification.service, url="https://original_url.com") assert get_notification_by_id(notification.id).status == 'sending' assert process_ses_results(ses_soft_bounce_callback(reference='ref')) assert get_notification_by_id(notification.id).status == 'temporary-failure' assert send_mock.called - assert f'SES bounce for notification ID {notification.id}: ' in mock_logger.call_args[0][0] - - def test_ses_callback_should_set_status_to_permanent_failure(client, + _notify_db, notify_db_session, sample_email_template, mocker): send_mock = mocker.patch( - 'app.celery.process_ses_receipts_tasks.check_and_queue_callback_task' + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' ) - mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.info') - notification = create_notification( + notification = create_sample_notification( + _notify_db, + notify_db_session, template=sample_email_template, - status='sending', reference='ref', + status='sending', + sent_at=datetime.utcnow() ) - + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") assert get_notification_by_id(notification.id).status == 'sending' assert process_ses_results(ses_hard_bounce_callback(reference='ref')) assert get_notification_by_id(notification.id).status == 'permanent-failure' assert send_mock.called - assert f'SES bounce for notification ID {notification.id}: ' in mock_logger.call_args[0][0] - - def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): send_mock = mocker.patch( 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' @@ -210,13 +305,11 @@ def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email create_service_callback_api( service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" ) - notification = create_notification( template=sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending' ) response = ses_complaint_callback() assert process_ses_results(response) - assert send_mock.call_count == 1 assert encryption.decrypt(send_mock.call_args[0][0][0]) == { 'complaint_date': '2018-06-05T13:59:58.000000Z', @@ -227,3 +320,4 @@ def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email 'service_callback_api_url': 'https://original_url.com', 'to': 'recipient1@example.com' } + \ No newline at end of file diff --git a/tests/app/conftest.py b/tests/app/conftest.py index 61b87ca8b..206a5a2d5 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -31,6 +31,7 @@ from app.models import ( KEY_TYPE_TEAM, KEY_TYPE_TEST, LETTER_TYPE, + NOTIFICATION_STATUS_TYPES_COMPLETED, SERVICE_PERMISSION_TYPES, SMS_TYPE, ApiKey, @@ -69,6 +70,96 @@ def rmock(): yield rmock +def create_sample_notification( + notify_db, + notify_db_session, + service=None, + template=None, + job=None, + job_row_number=None, + to_field=None, + status="created", + provider_response=None, + reference=None, + created_at=None, + sent_at=None, + billable_units=1, + personalisation=None, + api_key=None, + key_type=KEY_TYPE_NORMAL, + sent_by=None, + international=False, + client_reference=None, + rate_multiplier=1.0, + scheduled_for=None, + normalised_to=None, + postage=None, +): + if created_at is None: + created_at = datetime.utcnow() + if service is None: + service = create_service(check_if_service_exists=True) + if template is None: + template = create_template(service=service) + + if job is None and api_key is None: + # we didn't specify in test - lets create it + api_key = ApiKey.query.filter(ApiKey.service == template.service, ApiKey.key_type == key_type).first() + if not api_key: + api_key = create_api_key(template.service, key_type=key_type) + + notification_id = uuid.uuid4() + + if to_field: + to = to_field + else: + to = "+16502532222" + + data = { + "id": notification_id, + "to": to, + "job_id": job.id if job else None, + "job": job, + "service_id": service.id, + "service": service, + "template_id": template.id, + "template_version": template.version, + "status": status, + "provider_response": provider_response, + "reference": reference, + "created_at": created_at, + "sent_at": sent_at, + "billable_units": billable_units, + "personalisation": personalisation, + "notification_type": template.template_type, + "api_key": api_key, + "api_key_id": api_key and api_key.id, + "key_type": api_key.key_type if api_key else key_type, + "sent_by": sent_by, + "updated_at": created_at if status in NOTIFICATION_STATUS_TYPES_COMPLETED else None, + "client_reference": client_reference, + "rate_multiplier": rate_multiplier, + "normalised_to": normalised_to, + "postage": postage, + } + if job_row_number is not None: + data["job_row_number"] = job_row_number + notification = Notification(**data) + dao_create_notification(notification) + # if scheduled_for: + # scheduled_notification = ScheduledNotification( + # id=uuid.uuid4(), + # notification_id=notification.id, + # scheduled_for=datetime.strptime(scheduled_for, "%Y-%m-%d %H:%M"), + # ) + # if status != "created": + # scheduled_notification.pending = False + # db.session.add(scheduled_notification) + # db.session.commit() + + return notification + + @pytest.fixture(scope='function') def service_factory(sample_user): class ServiceFactory(object): diff --git a/tests/app/v2/notifications/test_get_notifications.py b/tests/app/v2/notifications/test_get_notifications.py index 299b1d6e2..7c93f9aad 100644 --- a/tests/app/v2/notifications/test_get_notifications.py +++ b/tests/app/v2/notifications/test_get_notifications.py @@ -68,6 +68,7 @@ def test_get_notification_by_id_returns_200( 'completed_at': sample_notification.completed_at(), 'scheduled_for': None, 'postage': None, + 'provider_response': None } assert json_response == expected_response @@ -120,6 +121,7 @@ def test_get_notification_by_id_with_placeholders_returns_200( 'completed_at': sample_notification.completed_at(), 'scheduled_for': None, 'postage': None, + 'provider_response': None } assert json_response == expected_response