diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index bfa21ad89..8f743fa95 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -97,13 +97,15 @@ def sns_callback_handler(): @notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300) -@statsd(namespace="tasks") def process_ses_results(self, response): try: ses_message = json.loads(response["Message"]) notification_type = ses_message["notificationType"] - print(f"ses_message is: {ses_message}") - if notification_type == "Complaint": + bounce_message = None + + if notification_type == 'Bounce': + bounce_message = _determine_notification_bounce_type(ses_message) + elif notification_type == 'Complaint': _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) return True @@ -111,21 +113,26 @@ def process_ses_results(self, response): notification_status = aws_response_dict["notification_status"] reference = ses_message["mail"]["messageId"] - - print(f"notification_status is: {notification_status}") try: notification = notifications_dao.dao_get_notification_by_reference(reference) except NoResultFound: message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None) if datetime.utcnow() - message_time < timedelta(minutes=5): + current_app.logger.info( + f"notification not found for reference: {reference} (while attempting update to {notification_status}). " + f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue" + ) self.retry(queue=QueueNames.RETRY) else: current_app.logger.warning( - "notification not found for reference: {} (update to {})".format(reference, notification_status) + "notification not found for reference: {} (while attempting update to {})".format(reference, notification_status) ) return + if bounce_message: + current_app.logger.info(f"SES bounce for notification ID {notification.id}: {bounce_message}") + if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: notifications_dao._duplicate_update_warning( notification, @@ -166,71 +173,3 @@ def process_ses_results(self, response): current_app.logger.exception("Error processing SES results: {}".format(type(e))) self.retry(queue=QueueNames.RETRY) -# def process_ses_results(self, response): -# try: -# ses_message = json.loads(response['Message']) -# print(f"ses_message is {ses_message}") -# notification_type = ses_message['notificationType'] -# print(f"notification_type is {notification_type}") -# if notification_type == 'Bounce': -# notification_type = _determine_notification_bounce_type(ses_message) -# elif notification_type == 'Complaint': -# _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) -# return True -# aws_response_dict = get_aws_responses(notification_type) -# print(f"aws_response_dict is {aws_response_dict}") -# notification_status = aws_response_dict['notification_status'] -# print(f"notification_status is {notification_status}") -# reference = ses_message['mail']['messageId'] -# try: -# notification = notifications_dao.dao_get_notification_by_reference(reference) -# print(f"notification is {notification}") -# except NoResultFound: -# print(f"notification not found") -# message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None) -# if datetime.utcnow() - message_time < timedelta(minutes=5): -# self.retry(queue=QueueNames.RETRY) -# else: -# current_app.logger.warning( -# "notification not found for reference: {} (update to {})".format(reference, notification_status) -# ) -# return -# print(f"notification.status is {notification.status}") -# if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: -# print(f"notification.status is not in [{NOTIFICATION_SENDING}, {NOTIFICATION_PENDING}]") -# notifications_dao._duplicate_update_warning(notification, notification_status) -# return -# notifications_dao._update_notification_status( -# notification=notification, -# status=notification_status, -# provider_response=None -# ) -# if not aws_response_dict['success']: -# current_app.logger.info( -# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( -# notification.id, reference, aws_response_dict['message'] -# ) -# ) -# print( -# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( -# notification.id, reference, aws_response_dict['message'] -# ) -# ) -# else: -# current_app.logger.info('SES callback return status of {} for notification: {}'.format( -# notification_status, notification.id -# )) -# print('SES callback return status of {} for notification: {}'.format( -# notification_status, notification.id -# )) -# statsd_client.incr('callback.ses.{}'.format(notification_status)) -# if notification.sent_at: -# statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at) -# check_and_queue_callback_task(notification) -# return True -# except Retry: -# raise -# except Exception as e: -# current_app.logger.exception('Error processing SES results: {}'.format(type(e))) -# self.retry(queue=QueueNames.RETRY) - \ No newline at end of file diff --git a/app/celery/tasks.py b/app/celery/tasks.py index d83bf7cc2..48c6f3b66 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -21,7 +21,7 @@ from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job from app.dao.notifications_dao import ( dao_get_last_notification_added_for_job_id, - dao_get_notification_or_history_by_reference, + dao_get_notification_history_by_reference, dao_update_notifications_by_reference, get_notification_by_id, update_notification_status_by_reference, @@ -547,7 +547,7 @@ def update_letter_notification(filename, temporary_failures, update): def check_billable_units(notification_update): - notification = dao_get_notification_or_history_by_reference(notification_update.reference) + notification = dao_get_notification_history_by_reference(notification_update.reference) if int(notification_update.page_count) != notification.billable_units: msg = 'Notification with id {} has {} billable_units but DVLA says page count is {}'.format( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index e5698a3b5..efdbfaefb 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -592,18 +592,6 @@ def dao_get_notification_by_reference(reference): ).one() -def dao_get_notification_or_history_by_reference(reference): - try: - # This try except is necessary because in test keys and research mode does not create notification history. - # Otherwise we could just search for the NotificationHistory object - return Notification.query.filter( - Notification.reference == reference - ).one() - except NoResultFound: - return NotificationHistory.query.filter( - NotificationHistory.reference == reference - ).one() - def dao_get_notification_history_by_reference(reference): try: # This try except is necessary because in test keys and research mode does not create notification history. diff --git a/app/delivery/send_to_providers.py b/app/delivery/send_to_providers.py index 48c634e76..e3e8aa6b4 100644 --- a/app/delivery/send_to_providers.py +++ b/app/delivery/send_to_providers.py @@ -163,8 +163,8 @@ def update_notification_to_sending(notification, provider): notification.sent_at = datetime.utcnow() notification.sent_by = provider.name if notification.status not in NOTIFICATION_STATUS_TYPES_COMPLETED: - # We currently have no callback method for SNS - # TODO create celery task to request delivery receipts from cloudwatch api + # We currently have no callback method for SMS deliveries + # TODO create celery task to request SMS delivery receipts from cloudwatch api notification.status = NOTIFICATION_SENT if notification.notification_type == "sms" else NOTIFICATION_SENDING dao_update_notification(notification) diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index 1de6aeed6..a56de6f5a 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -112,8 +112,8 @@ def remove_mail_headers(dict_to_edit): def remove_emails_from_bounce(bounce_dict): remove_mail_headers(bounce_dict) - bounce_dict["mail"].pop("destination") - bounce_dict["bounce"].pop("bouncedRecipients") + bounce_dict["mail"].pop("destination", None) + bounce_dict["bounce"].pop("bouncedRecipients", None) def remove_emails_from_complaint(complaint_dict): @@ -121,6 +121,7 @@ def remove_emails_from_complaint(complaint_dict): complaint_dict["complaint"].pop("complainedRecipients") return complaint_dict["mail"].pop("destination") + def check_and_queue_callback_task(notification): # queue callback task only if the service_callback_api exists service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) @@ -128,17 +129,11 @@ def check_and_queue_callback_task(notification): notification_data = create_delivery_status_callback_data(notification, service_callback_api) send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS) -def check_and_queue_callback_task(notification): - # queue callback task only if the service_callback_api exists - service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) - if service_callback_api: - notification_data = create_delivery_status_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), notification_data], - queue=QueueNames.CALLBACKS) def _check_and_queue_complaint_callback_task(complaint, notification, recipient): # queue callback task only if the service_callback_api exists service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id) if service_callback_api: complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient) - send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) \ No newline at end of file + send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS) + \ No newline at end of file diff --git a/tests/app/celery/test_process_ses_receipts_tasks.py b/tests/app/celery/test_process_ses_receipts_tasks.py index 51fda72aa..934f76c79 100644 --- a/tests/app/celery/test_process_ses_receipts_tasks.py +++ b/tests/app/celery/test_process_ses_receipts_tasks.py @@ -191,7 +191,7 @@ def test_ses_callback_should_log_if_notification_is_missing(client, _notify_db, with freeze_time('2017-11-17T12:34:03.646Z'): assert process_ses_results(ses_notification_callback(reference='ref')) is None assert mock_retry.call_count == 0 - mock_logger.assert_called_once_with('notification not found for reference: ref (update to delivered)') + mock_logger.assert_called_once_with('notification not found for reference: ref (while attempting update to delivered)') def test_ses_callback_should_not_retry_if_notification_is_old(client, _notify_db, mocker): mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') diff --git a/tests/app/conftest.py b/tests/app/conftest.py index 206a5a2d5..f68afb94f 100644 --- a/tests/app/conftest.py +++ b/tests/app/conftest.py @@ -146,16 +146,6 @@ def create_sample_notification( data["job_row_number"] = job_row_number notification = Notification(**data) dao_create_notification(notification) - # if scheduled_for: - # scheduled_notification = ScheduledNotification( - # id=uuid.uuid4(), - # notification_id=notification.id, - # scheduled_for=datetime.strptime(scheduled_for, "%Y-%m-%d %H:%M"), - # ) - # if status != "created": - # scheduled_notification.pending = False - # db.session.add(scheduled_notification) - # db.session.commit() return notification diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index b7a4430d4..b05c7f649 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -15,7 +15,7 @@ from app.dao.notifications_dao import ( dao_get_letters_to_be_printed, dao_get_notification_by_reference, dao_get_notification_count_for_job_id, - dao_get_notification_or_history_by_reference, + dao_get_notification_history_by_reference, dao_get_notifications_by_recipient_or_reference, dao_timeout_notifications, dao_update_notification, @@ -1607,28 +1607,28 @@ def test_dao_get_notification_by_reference_with_no_matches_raises_error(notify_d dao_get_notification_by_reference('REF1') -def test_dao_get_notification_or_history_by_reference_with_one_match_returns_notification( +def test_dao_get_notification_history_by_reference_with_one_match_returns_notification( sample_letter_template ): create_notification(template=sample_letter_template, reference='REF1') - notification = dao_get_notification_or_history_by_reference('REF1') + notification = dao_get_notification_history_by_reference('REF1') assert notification.reference == 'REF1' -def test_dao_get_notification_or_history_by_reference_with_multiple_matches_raises_error( +def test_dao_get_notification_history_by_reference_with_multiple_matches_raises_error( sample_letter_template ): create_notification(template=sample_letter_template, reference='REF1') create_notification(template=sample_letter_template, reference='REF1') with pytest.raises(SQLAlchemyError): - dao_get_notification_or_history_by_reference('REF1') + dao_get_notification_history_by_reference('REF1') -def test_dao_get_notification_or_history_by_reference_with_no_matches_raises_error(notify_db_session): +def test_dao_get_notification_history_by_reference_with_no_matches_raises_error(notify_db_session): with pytest.raises(SQLAlchemyError): - dao_get_notification_or_history_by_reference('REF1') + dao_get_notification_history_by_reference('REF1') @pytest.mark.parametrize("notification_type",