diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index ad88fcc3c..ebc04ba92 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -1,19 +1,80 @@ -from flask import current_app +from datetime import datetime, timedelta + +import iso8601 +from flask import current_app, json from notifications_utils.statsd_decorators import statsd +from sqlalchemy.orm.exc import NoResultFound -from app import notify_celery +from app import notify_celery, statsd_client from app.config import QueueNames +from app.clients.email.aws_ses import get_aws_responses +from app.dao import notifications_dao +from app.models import NOTIFICATION_SENDING, NOTIFICATION_PENDING -from app.notifications.notifications_ses_callback import process_ses_response +from app.notifications.notifications_ses_callback import ( + determine_notification_bounce_type, + handle_complaint, + _check_and_queue_complaint_callback_task, + _check_and_queue_callback_task, +) @notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def process_ses_results(self, response): try: - errors = process_ses_response(response) - if errors: - current_app.logger.error(errors) - except Exception: - current_app.logger.exception('Error processing SES results') + ses_message = json.loads(response['Message']) + notification_type = ses_message['notificationType'] + + if notification_type == 'Bounce': + notification_type = determine_notification_bounce_type(notification_type, ses_message) + elif notification_type == 'Complaint': + _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) + return True + + aws_response_dict = get_aws_responses(notification_type) + + notification_status = aws_response_dict['notification_status'] + reference = ses_message['mail']['messageId'] + + try: + notification = notifications_dao.dao_get_notification_by_reference(reference) + except NoResultFound: + message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None) + if datetime.utcnow() - message_time < timedelta(minutes=10): + self.retry(queue=QueueNames.RETRY) + elif datetime.utcnow() - message_time < timedelta(days=3): + current_app.logger.error( + "notification not found for reference: {} (update to {})".format(reference, notification_status) + ) + return + + if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: + notifications_dao._duplicate_update_warning(notification, notification_status) + return + + notifications_dao._update_notification_status(notification=notification, status=notification_status) + + if not aws_response_dict['success']: + current_app.logger.info( + "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( + notification.id, reference, aws_response_dict['message'] + ) + ) + else: + current_app.logger.info('SES callback return status of {} for notification: {}'.format( + notification_status, notification.id + )) + + statsd_client.incr('callback.ses.{}'.format(notification_status)) + + if notification.sent_at: + statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at) + + _check_and_queue_callback_task(notification) + + return True + + except Exception as e: + current_app.logger.exception('Error processing SES results: {}'.format(type(e))) self.retry(queue=QueueNames.RETRY) diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index d75abcd4e..496ad9321 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -1,22 +1,11 @@ -from datetime import datetime +from flask import current_app -from flask import ( - current_app, - json -) - -from app import statsd_client -from app.clients.email.aws_ses import get_aws_responses -from app.dao import ( - notifications_dao -) from app.dao.complaint_dao import save_complaint from app.dao.notifications_dao import dao_get_notification_history_by_reference from app.dao.service_callback_api_dao import ( get_service_delivery_status_callback_api_for_service, get_service_complaint_callback_api_for_service ) from app.models import Complaint -from app.notifications.process_client_response import validate_callback_data from app.celery.service_callback_tasks import ( send_delivery_status_to_service, send_complaint_to_service, @@ -26,76 +15,6 @@ from app.celery.service_callback_tasks import ( from app.config import QueueNames -def process_ses_response(ses_request): - client_name = 'SES' - try: - errors = validate_callback_data(data=ses_request, fields=['Message'], client_name=client_name) - if errors: - return errors - - ses_message = json.loads(ses_request['Message']) - errors = validate_callback_data(data=ses_message, fields=['notificationType'], client_name=client_name) - if errors: - return errors - - notification_type = ses_message['notificationType'] - if notification_type == 'Bounce': - notification_type = determine_notification_bounce_type(notification_type, ses_message) - elif notification_type == 'Complaint': - complaint, notification, recipient = handle_complaint(ses_message) - _check_and_queue_complaint_callback_task(complaint, notification, recipient) - return - - try: - aws_response_dict = get_aws_responses(notification_type) - except KeyError: - error = "{} callback failed: status {} not found".format(client_name, notification_type) - return error - - notification_status = aws_response_dict['notification_status'] - - try: - reference = ses_message['mail']['messageId'] - notification = notifications_dao.update_notification_status_by_reference( - reference, - notification_status - ) - if not notification: - return - - if not aws_response_dict['success']: - current_app.logger.info( - "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( - notification.id, - reference, - aws_response_dict['message'] - ) - ) - else: - current_app.logger.info('{} callback return status of {} for notification: {}'.format( - client_name, - notification_status, - notification.id)) - statsd_client.incr('callback.ses.{}'.format(notification_status)) - if notification.sent_at: - statsd_client.timing_with_dates( - 'callback.ses.elapsed-time'.format(client_name.lower()), - datetime.utcnow(), - notification.sent_at - ) - - _check_and_queue_callback_task(notification) - return - - except KeyError: - error = "SES callback failed: messageId missing" - return error - - except ValueError: - error = "{} callback failed: invalid json".format(client_name) - return error - - def determine_notification_bounce_type(notification_type, ses_message): remove_emails_from_bounce(ses_message) current_app.logger.info('SES bounce dict: {}'.format(ses_message)) diff --git a/tests/app/celery/test_process_ses_receipts_tasks.py b/tests/app/celery/test_process_ses_receipts_tasks.py index 80ff799aa..3cd27cfd1 100644 --- a/tests/app/celery/test_process_ses_receipts_tasks.py +++ b/tests/app/celery/test_process_ses_receipts_tasks.py @@ -1,40 +1,38 @@ import json from datetime import datetime + +from freezegun import freeze_time + + +from app import statsd_client, encryption from app.celery.process_ses_receipts_tasks import process_ses_results -from app.celery.research_mode_tasks import ses_hard_bounce_callback -from app.models import Complaint +from app.celery.research_mode_tasks import ses_hard_bounce_callback, ses_soft_bounce_callback, ses_notification_callback +from app.celery.service_callback_tasks import create_delivery_status_callback_data +from app.dao.notifications_dao import get_notification_by_id +from app.models import Complaint, Notification from app.notifications.notifications_ses_callback import remove_emails_from_complaint, remove_emails_from_bounce from tests.app.db import ( - create_notification, ses_complaint_callback, - ses_notification_callback, + create_notification, + ses_complaint_callback, + create_service_callback_api, ) +from tests.app.conftest import sample_notification as create_sample_notification def test_process_ses_results(sample_email_template): - create_notification( - sample_email_template, - reference='ref1', - sent_at=datetime.utcnow(), - status='sending') + create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') - response = json.loads(ses_notification_callback()) - assert process_ses_results(response=response) is None + assert process_ses_results(response=ses_notification_callback(reference='ref1')) -def test_process_ses_results_does_not_retry_if_errors(notify_db, mocker): +def test_process_ses_results_retry_called(sample_email_template, notify_db, mocker): + create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') + + mocker.patch("app.dao.notifications_dao._update_notification_status", side_effect=Exception("EXPECTED")) mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') - response = json.loads(ses_notification_callback()) - process_ses_results(response=response) - assert mocked.call_count == 0 - - -def test_process_ses_results_retry_called(notify_db, mocker): - mocker.patch("app.dao.notifications_dao.update_notification_status_by_reference", side_effect=Exception("EXPECTED")) - mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') - response = json.loads(ses_notification_callback()) - process_ses_results(response=response) + process_ses_results(response=ses_notification_callback(reference='ref1')) assert mocked.call_count != 0 @@ -58,3 +56,221 @@ def test_remove_email_from_bounce(): test_json = json.loads(ses_hard_bounce_callback(reference='ref1')['Message']) remove_emails_from_bounce(test_json) assert "bounce@simulator.amazonses.com" not in json.dumps(test_json) + + +def test_ses_callback_should_update_notification_status( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + with freeze_time('2001-01-01T12:00:00'): + mocker.patch('app.statsd_client.incr') + mocker.patch('app.statsd_client.timing_with_dates') + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + callback_api = create_service_callback_api(service=sample_email_template.service, + url="https://original_url.com") + assert get_notification_by_id(notification.id).status == 'sending' + + assert process_ses_results(ses_notification_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'delivered' + statsd_client.timing_with_dates.assert_any_call( + "callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at + ) + statsd_client.incr.assert_any_call("callback.ses.delivered") + updated_notification = Notification.query.get(notification.id) + encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) + send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") + + +def test_ses_callback_should_not_update_notification_status_if_already_delivered(sample_email_template, mocker): + mock_dup = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._duplicate_update_warning') + mock_upd = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._update_notification_status') + notification = create_notification(template=sample_email_template, reference='ref', status='delivered') + + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert get_notification_by_id(notification.id).status == 'delivered' + + mock_dup.assert_called_once_with(notification, 'delivered') + assert mock_upd.call_count == 0 + + +def test_ses_callback_should_retry_if_notification_is_new(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-17T12:14:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_logger.call_count == 0 + assert mock_retry.call_count == 1 + + +def test_ses_callback_should_log_if_notification_is_missing(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-17T12:34:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_retry.call_count == 0 + mock_logger.assert_called_once_with('notification not found for reference: ref (update to delivered)') + + +def test_ses_callback_should_not_retry_if_notification_is_old(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-21T12:14:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_logger.call_count == 0 + assert mock_retry.call_count == 0 + + +def test_ses_callback_does_not_call_send_delivery_status_if_no_db_entry( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + with freeze_time('2001-01-01T12:00:00'): + + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + + assert get_notification_by_id(notification.id).status == 'sending' + + assert process_ses_results(ses_notification_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'delivered' + + send_mock.assert_not_called() + + +def test_ses_callback_should_update_multiple_notification_status_sent( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref1', + sent_at=datetime.utcnow(), + status='sending') + + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref2', + sent_at=datetime.utcnow(), + status='sending') + + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref3', + sent_at=datetime.utcnow(), + status='sending') + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") + assert process_ses_results(ses_notification_callback(reference='ref1')) + assert process_ses_results(ses_notification_callback(reference='ref2')) + assert process_ses_results(ses_notification_callback(reference='ref3')) + assert send_mock.called + + +def test_ses_callback_should_set_status_to_temporary_failure(client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + create_service_callback_api(service=notification.service, url="https://original_url.com") + assert get_notification_by_id(notification.id).status == 'sending' + assert process_ses_results(ses_soft_bounce_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'temporary-failure' + assert send_mock.called + + +def test_ses_callback_should_set_status_to_permanent_failure(client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") + + assert get_notification_by_id(notification.id).status == 'sending' + assert process_ses_results(ses_hard_bounce_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'permanent-failure' + assert send_mock.called + + +def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' + ) + create_service_callback_api( + service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" + ) + + notification = create_notification( + template=sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending' + ) + response = ses_complaint_callback() + assert process_ses_results(response) + + assert send_mock.call_count == 1 + assert encryption.decrypt(send_mock.call_args[0][0][0]) == { + 'complaint_date': '2018-06-05T13:59:58.000000Z', + 'complaint_id': str(Complaint.query.one().id), + 'notification_id': str(notification.id), + 'reference': None, + 'service_callback_api_bearer_token': 'some_super_secret', + 'service_callback_api_url': 'https://original_url.com', + 'to': 'recipient1@example.com' + } diff --git a/tests/app/notifications/test_notifications_ses_callback.py b/tests/app/notifications/test_notifications_ses_callback.py index 72166b9e8..80e773768 100644 --- a/tests/app/notifications/test_notifications_ses_callback.py +++ b/tests/app/notifications/test_notifications_ses_callback.py @@ -2,155 +2,20 @@ from datetime import datetime import pytest from flask import json -from freezegun import freeze_time from sqlalchemy.exc import SQLAlchemyError -from app import statsd_client, encryption from app.dao.notifications_dao import get_notification_by_id -from app.models import Notification, Complaint -from app.notifications.notifications_ses_callback import ( - process_ses_response, - handle_complaint -) -from app.celery.research_mode_tasks import ses_hard_bounce_callback, ses_soft_bounce_callback, ses_notification_callback -from app.celery.service_callback_tasks import create_delivery_status_callback_data +from app.models import Complaint +from app.notifications.notifications_ses_callback import handle_complaint from tests.app.conftest import sample_notification as create_sample_notification from tests.app.db import ( - create_service_callback_api, create_notification, ses_complaint_callback_malformed_message_id, + create_notification, ses_complaint_callback_malformed_message_id, ses_complaint_callback_with_missing_complaint_type, ses_complaint_callback ) -def test_ses_callback_should_update_notification_status( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - with freeze_time('2001-01-01T12:00:00'): - mocker.patch('app.statsd_client.incr') - mocker.patch('app.statsd_client.timing_with_dates') - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - callback_api = create_service_callback_api(service=sample_email_template.service, - url="https://original_url.com") - assert get_notification_by_id(notification.id).status == 'sending' - - errors = process_ses_response(ses_notification_callback(reference='ref')) - assert errors is None - assert get_notification_by_id(notification.id).status == 'delivered' - statsd_client.timing_with_dates.assert_any_call( - "callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at - ) - statsd_client.incr.assert_any_call("callback.ses.delivered") - updated_notification = Notification.query.get(notification.id) - encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) - send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") - - -def test_ses_callback_does_not_call_send_delivery_status_if_no_db_entry( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - with freeze_time('2001-01-01T12:00:00'): - - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - - assert get_notification_by_id(notification.id).status == 'sending' - - errors = process_ses_response(ses_notification_callback(reference='ref')) - assert errors is None - assert get_notification_by_id(notification.id).status == 'delivered' - - send_mock.assert_not_called() - - -def test_ses_callback_should_update_multiple_notification_status_sent( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref1', - sent_at=datetime.utcnow(), - status='sending') - - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref2', - sent_at=datetime.utcnow(), - status='sending') - - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref3', - sent_at=datetime.utcnow(), - status='sending') - create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") - assert process_ses_response(ses_notification_callback(reference='ref1')) is None - assert process_ses_response(ses_notification_callback(reference='ref2')) is None - assert process_ses_response(ses_notification_callback(reference='ref3')) is None - assert send_mock.called - - -def test_ses_callback_should_set_status_to_temporary_failure(client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - create_service_callback_api(service=notification.service, url="https://original_url.com") - assert get_notification_by_id(notification.id).status == 'sending' - assert process_ses_response(ses_soft_bounce_callback(reference='ref')) is None - assert get_notification_by_id(notification.id).status == 'temporary-failure' - assert send_mock.called - - def test_ses_callback_should_not_set_status_once_status_is_delivered(client, notify_db, notify_db_session, @@ -168,30 +33,6 @@ def test_ses_callback_should_not_set_status_once_status_is_delivered(client, assert get_notification_by_id(notification.id).status == 'delivered' -def test_ses_callback_should_set_status_to_permanent_failure(client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") - - assert get_notification_by_id(notification.id).status == 'sending' - assert process_ses_response(ses_hard_bounce_callback(reference='ref')) is None - assert get_notification_by_id(notification.id).status == 'permanent-failure' - assert send_mock.called - - def test_process_ses_results_in_complaint(sample_email_template): notification = create_notification(template=sample_email_template, reference='ref1') handle_complaint(json.loads(ses_complaint_callback()['Message'])) @@ -220,28 +61,3 @@ def test_process_ses_results_in_complaint_save_complaint_with_null_complaint_typ assert len(complaints) == 1 assert complaints[0].notification_id == notification.id assert not complaints[0].complaint_type - - -def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' - ) - create_service_callback_api( - service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" - ) - - notification = create_notification(template=sample_email_template, reference='ref1') - response = ses_complaint_callback() - errors = process_ses_response(response) - assert errors is None - - assert send_mock.call_count == 1 - assert encryption.decrypt(send_mock.call_args[0][0][0]) == { - 'complaint_date': '2018-06-05T13:59:58.000000Z', - 'complaint_id': str(Complaint.query.one().id), - 'notification_id': str(notification.id), - 'reference': None, - 'service_callback_api_bearer_token': 'some_super_secret', - 'service_callback_api_url': 'https://original_url.com', - 'to': 'recipient1@example.com' - }