From 2932b44eb81b0039ef0564def5ad4e70dcfcc96f Mon Sep 17 00:00:00 2001 From: Alexey Bezhan Date: Wed, 13 Feb 2019 11:35:31 +0000 Subject: [PATCH] Add retries for SES callbacks for recent notifications We've seen errors caused by what we suspect is a race condition when SES callback processing tries to look up the notification before the sender worker has saved notification reference from the SES POST response to the database. This adds a retry for SES callback task if the notification was not found and the message is less than 10 minutes old and removes the error log message for notifications older than 3 days (since they might no longer exist in the notifications table and would've been marked as failure by then either way). In order to be able to call retry and silence the error log based on notification time this change inlines `process_ses_response` and `update_notification_by_reference` functions into the celery task. It also removes a lot of defensive error-handling that doesn't appear to have been triggered in the last few months (for things like missing keys in SES callback data). --- app/celery/process_ses_receipts_tasks.py | 77 +++++- .../notifications_ses_callback.py | 83 +----- .../celery/test_process_ses_receipts_tasks.py | 260 ++++++++++++++++-- .../test_notifications_ses_callback.py | 190 +------------ 4 files changed, 311 insertions(+), 299 deletions(-) diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index ad88fcc3c..ebc04ba92 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -1,19 +1,80 @@ -from flask import current_app +from datetime import datetime, timedelta + +import iso8601 +from flask import current_app, json from notifications_utils.statsd_decorators import statsd +from sqlalchemy.orm.exc import NoResultFound -from app import notify_celery +from app import notify_celery, statsd_client from app.config import QueueNames +from app.clients.email.aws_ses import get_aws_responses +from app.dao import notifications_dao +from app.models import NOTIFICATION_SENDING, NOTIFICATION_PENDING -from app.notifications.notifications_ses_callback import process_ses_response +from app.notifications.notifications_ses_callback import ( + determine_notification_bounce_type, + handle_complaint, + _check_and_queue_complaint_callback_task, + _check_and_queue_callback_task, +) @notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300) @statsd(namespace="tasks") def process_ses_results(self, response): try: - errors = process_ses_response(response) - if errors: - current_app.logger.error(errors) - except Exception: - current_app.logger.exception('Error processing SES results') + ses_message = json.loads(response['Message']) + notification_type = ses_message['notificationType'] + + if notification_type == 'Bounce': + notification_type = determine_notification_bounce_type(notification_type, ses_message) + elif notification_type == 'Complaint': + _check_and_queue_complaint_callback_task(*handle_complaint(ses_message)) + return True + + aws_response_dict = get_aws_responses(notification_type) + + notification_status = aws_response_dict['notification_status'] + reference = ses_message['mail']['messageId'] + + try: + notification = notifications_dao.dao_get_notification_by_reference(reference) + except NoResultFound: + message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None) + if datetime.utcnow() - message_time < timedelta(minutes=10): + self.retry(queue=QueueNames.RETRY) + elif datetime.utcnow() - message_time < timedelta(days=3): + current_app.logger.error( + "notification not found for reference: {} (update to {})".format(reference, notification_status) + ) + return + + if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}: + notifications_dao._duplicate_update_warning(notification, notification_status) + return + + notifications_dao._update_notification_status(notification=notification, status=notification_status) + + if not aws_response_dict['success']: + current_app.logger.info( + "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( + notification.id, reference, aws_response_dict['message'] + ) + ) + else: + current_app.logger.info('SES callback return status of {} for notification: {}'.format( + notification_status, notification.id + )) + + statsd_client.incr('callback.ses.{}'.format(notification_status)) + + if notification.sent_at: + statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at) + + _check_and_queue_callback_task(notification) + + return True + + except Exception as e: + current_app.logger.exception('Error processing SES results: {}'.format(type(e))) self.retry(queue=QueueNames.RETRY) diff --git a/app/notifications/notifications_ses_callback.py b/app/notifications/notifications_ses_callback.py index d75abcd4e..496ad9321 100644 --- a/app/notifications/notifications_ses_callback.py +++ b/app/notifications/notifications_ses_callback.py @@ -1,22 +1,11 @@ -from datetime import datetime +from flask import current_app -from flask import ( - current_app, - json -) - -from app import statsd_client -from app.clients.email.aws_ses import get_aws_responses -from app.dao import ( - notifications_dao -) from app.dao.complaint_dao import save_complaint from app.dao.notifications_dao import dao_get_notification_history_by_reference from app.dao.service_callback_api_dao import ( get_service_delivery_status_callback_api_for_service, get_service_complaint_callback_api_for_service ) from app.models import Complaint -from app.notifications.process_client_response import validate_callback_data from app.celery.service_callback_tasks import ( send_delivery_status_to_service, send_complaint_to_service, @@ -26,76 +15,6 @@ from app.celery.service_callback_tasks import ( from app.config import QueueNames -def process_ses_response(ses_request): - client_name = 'SES' - try: - errors = validate_callback_data(data=ses_request, fields=['Message'], client_name=client_name) - if errors: - return errors - - ses_message = json.loads(ses_request['Message']) - errors = validate_callback_data(data=ses_message, fields=['notificationType'], client_name=client_name) - if errors: - return errors - - notification_type = ses_message['notificationType'] - if notification_type == 'Bounce': - notification_type = determine_notification_bounce_type(notification_type, ses_message) - elif notification_type == 'Complaint': - complaint, notification, recipient = handle_complaint(ses_message) - _check_and_queue_complaint_callback_task(complaint, notification, recipient) - return - - try: - aws_response_dict = get_aws_responses(notification_type) - except KeyError: - error = "{} callback failed: status {} not found".format(client_name, notification_type) - return error - - notification_status = aws_response_dict['notification_status'] - - try: - reference = ses_message['mail']['messageId'] - notification = notifications_dao.update_notification_status_by_reference( - reference, - notification_status - ) - if not notification: - return - - if not aws_response_dict['success']: - current_app.logger.info( - "SES delivery failed: notification id {} and reference {} has error found. Status {}".format( - notification.id, - reference, - aws_response_dict['message'] - ) - ) - else: - current_app.logger.info('{} callback return status of {} for notification: {}'.format( - client_name, - notification_status, - notification.id)) - statsd_client.incr('callback.ses.{}'.format(notification_status)) - if notification.sent_at: - statsd_client.timing_with_dates( - 'callback.ses.elapsed-time'.format(client_name.lower()), - datetime.utcnow(), - notification.sent_at - ) - - _check_and_queue_callback_task(notification) - return - - except KeyError: - error = "SES callback failed: messageId missing" - return error - - except ValueError: - error = "{} callback failed: invalid json".format(client_name) - return error - - def determine_notification_bounce_type(notification_type, ses_message): remove_emails_from_bounce(ses_message) current_app.logger.info('SES bounce dict: {}'.format(ses_message)) diff --git a/tests/app/celery/test_process_ses_receipts_tasks.py b/tests/app/celery/test_process_ses_receipts_tasks.py index 80ff799aa..3cd27cfd1 100644 --- a/tests/app/celery/test_process_ses_receipts_tasks.py +++ b/tests/app/celery/test_process_ses_receipts_tasks.py @@ -1,40 +1,38 @@ import json from datetime import datetime + +from freezegun import freeze_time + + +from app import statsd_client, encryption from app.celery.process_ses_receipts_tasks import process_ses_results -from app.celery.research_mode_tasks import ses_hard_bounce_callback -from app.models import Complaint +from app.celery.research_mode_tasks import ses_hard_bounce_callback, ses_soft_bounce_callback, ses_notification_callback +from app.celery.service_callback_tasks import create_delivery_status_callback_data +from app.dao.notifications_dao import get_notification_by_id +from app.models import Complaint, Notification from app.notifications.notifications_ses_callback import remove_emails_from_complaint, remove_emails_from_bounce from tests.app.db import ( - create_notification, ses_complaint_callback, - ses_notification_callback, + create_notification, + ses_complaint_callback, + create_service_callback_api, ) +from tests.app.conftest import sample_notification as create_sample_notification def test_process_ses_results(sample_email_template): - create_notification( - sample_email_template, - reference='ref1', - sent_at=datetime.utcnow(), - status='sending') + create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') - response = json.loads(ses_notification_callback()) - assert process_ses_results(response=response) is None + assert process_ses_results(response=ses_notification_callback(reference='ref1')) -def test_process_ses_results_does_not_retry_if_errors(notify_db, mocker): +def test_process_ses_results_retry_called(sample_email_template, notify_db, mocker): + create_notification(sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending') + + mocker.patch("app.dao.notifications_dao._update_notification_status", side_effect=Exception("EXPECTED")) mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') - response = json.loads(ses_notification_callback()) - process_ses_results(response=response) - assert mocked.call_count == 0 - - -def test_process_ses_results_retry_called(notify_db, mocker): - mocker.patch("app.dao.notifications_dao.update_notification_status_by_reference", side_effect=Exception("EXPECTED")) - mocked = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') - response = json.loads(ses_notification_callback()) - process_ses_results(response=response) + process_ses_results(response=ses_notification_callback(reference='ref1')) assert mocked.call_count != 0 @@ -58,3 +56,221 @@ def test_remove_email_from_bounce(): test_json = json.loads(ses_hard_bounce_callback(reference='ref1')['Message']) remove_emails_from_bounce(test_json) assert "bounce@simulator.amazonses.com" not in json.dumps(test_json) + + +def test_ses_callback_should_update_notification_status( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + with freeze_time('2001-01-01T12:00:00'): + mocker.patch('app.statsd_client.incr') + mocker.patch('app.statsd_client.timing_with_dates') + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + callback_api = create_service_callback_api(service=sample_email_template.service, + url="https://original_url.com") + assert get_notification_by_id(notification.id).status == 'sending' + + assert process_ses_results(ses_notification_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'delivered' + statsd_client.timing_with_dates.assert_any_call( + "callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at + ) + statsd_client.incr.assert_any_call("callback.ses.delivered") + updated_notification = Notification.query.get(notification.id) + encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) + send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") + + +def test_ses_callback_should_not_update_notification_status_if_already_delivered(sample_email_template, mocker): + mock_dup = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._duplicate_update_warning') + mock_upd = mocker.patch('app.celery.process_ses_receipts_tasks.notifications_dao._update_notification_status') + notification = create_notification(template=sample_email_template, reference='ref', status='delivered') + + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert get_notification_by_id(notification.id).status == 'delivered' + + mock_dup.assert_called_once_with(notification, 'delivered') + assert mock_upd.call_count == 0 + + +def test_ses_callback_should_retry_if_notification_is_new(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-17T12:14:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_logger.call_count == 0 + assert mock_retry.call_count == 1 + + +def test_ses_callback_should_log_if_notification_is_missing(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-17T12:34:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_retry.call_count == 0 + mock_logger.assert_called_once_with('notification not found for reference: ref (update to delivered)') + + +def test_ses_callback_should_not_retry_if_notification_is_old(client, notify_db, mocker): + mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry') + mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error') + + with freeze_time('2017-11-21T12:14:03.646Z'): + assert process_ses_results(ses_notification_callback(reference='ref')) is None + assert mock_logger.call_count == 0 + assert mock_retry.call_count == 0 + + +def test_ses_callback_does_not_call_send_delivery_status_if_no_db_entry( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + with freeze_time('2001-01-01T12:00:00'): + + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + + assert get_notification_by_id(notification.id).status == 'sending' + + assert process_ses_results(ses_notification_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'delivered' + + send_mock.assert_not_called() + + +def test_ses_callback_should_update_multiple_notification_status_sent( + client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref1', + sent_at=datetime.utcnow(), + status='sending') + + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref2', + sent_at=datetime.utcnow(), + status='sending') + + create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref3', + sent_at=datetime.utcnow(), + status='sending') + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") + assert process_ses_results(ses_notification_callback(reference='ref1')) + assert process_ses_results(ses_notification_callback(reference='ref2')) + assert process_ses_results(ses_notification_callback(reference='ref3')) + assert send_mock.called + + +def test_ses_callback_should_set_status_to_temporary_failure(client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + create_service_callback_api(service=notification.service, url="https://original_url.com") + assert get_notification_by_id(notification.id).status == 'sending' + assert process_ses_results(ses_soft_bounce_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'temporary-failure' + assert send_mock.called + + +def test_ses_callback_should_set_status_to_permanent_failure(client, + notify_db, + notify_db_session, + sample_email_template, + mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' + ) + notification = create_sample_notification( + notify_db, + notify_db_session, + template=sample_email_template, + reference='ref', + status='sending', + sent_at=datetime.utcnow() + ) + create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") + + assert get_notification_by_id(notification.id).status == 'sending' + assert process_ses_results(ses_hard_bounce_callback(reference='ref')) + assert get_notification_by_id(notification.id).status == 'permanent-failure' + assert send_mock.called + + +def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): + send_mock = mocker.patch( + 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' + ) + create_service_callback_api( + service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" + ) + + notification = create_notification( + template=sample_email_template, reference='ref1', sent_at=datetime.utcnow(), status='sending' + ) + response = ses_complaint_callback() + assert process_ses_results(response) + + assert send_mock.call_count == 1 + assert encryption.decrypt(send_mock.call_args[0][0][0]) == { + 'complaint_date': '2018-06-05T13:59:58.000000Z', + 'complaint_id': str(Complaint.query.one().id), + 'notification_id': str(notification.id), + 'reference': None, + 'service_callback_api_bearer_token': 'some_super_secret', + 'service_callback_api_url': 'https://original_url.com', + 'to': 'recipient1@example.com' + } diff --git a/tests/app/notifications/test_notifications_ses_callback.py b/tests/app/notifications/test_notifications_ses_callback.py index 72166b9e8..80e773768 100644 --- a/tests/app/notifications/test_notifications_ses_callback.py +++ b/tests/app/notifications/test_notifications_ses_callback.py @@ -2,155 +2,20 @@ from datetime import datetime import pytest from flask import json -from freezegun import freeze_time from sqlalchemy.exc import SQLAlchemyError -from app import statsd_client, encryption from app.dao.notifications_dao import get_notification_by_id -from app.models import Notification, Complaint -from app.notifications.notifications_ses_callback import ( - process_ses_response, - handle_complaint -) -from app.celery.research_mode_tasks import ses_hard_bounce_callback, ses_soft_bounce_callback, ses_notification_callback -from app.celery.service_callback_tasks import create_delivery_status_callback_data +from app.models import Complaint +from app.notifications.notifications_ses_callback import handle_complaint from tests.app.conftest import sample_notification as create_sample_notification from tests.app.db import ( - create_service_callback_api, create_notification, ses_complaint_callback_malformed_message_id, + create_notification, ses_complaint_callback_malformed_message_id, ses_complaint_callback_with_missing_complaint_type, ses_complaint_callback ) -def test_ses_callback_should_update_notification_status( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - with freeze_time('2001-01-01T12:00:00'): - mocker.patch('app.statsd_client.incr') - mocker.patch('app.statsd_client.timing_with_dates') - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - callback_api = create_service_callback_api(service=sample_email_template.service, - url="https://original_url.com") - assert get_notification_by_id(notification.id).status == 'sending' - - errors = process_ses_response(ses_notification_callback(reference='ref')) - assert errors is None - assert get_notification_by_id(notification.id).status == 'delivered' - statsd_client.timing_with_dates.assert_any_call( - "callback.ses.elapsed-time", datetime.utcnow(), notification.sent_at - ) - statsd_client.incr.assert_any_call("callback.ses.delivered") - updated_notification = Notification.query.get(notification.id) - encrypted_data = create_delivery_status_callback_data(updated_notification, callback_api) - send_mock.assert_called_once_with([str(notification.id), encrypted_data], queue="service-callbacks") - - -def test_ses_callback_does_not_call_send_delivery_status_if_no_db_entry( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - with freeze_time('2001-01-01T12:00:00'): - - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - - assert get_notification_by_id(notification.id).status == 'sending' - - errors = process_ses_response(ses_notification_callback(reference='ref')) - assert errors is None - assert get_notification_by_id(notification.id).status == 'delivered' - - send_mock.assert_not_called() - - -def test_ses_callback_should_update_multiple_notification_status_sent( - client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref1', - sent_at=datetime.utcnow(), - status='sending') - - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref2', - sent_at=datetime.utcnow(), - status='sending') - - create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref3', - sent_at=datetime.utcnow(), - status='sending') - create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") - assert process_ses_response(ses_notification_callback(reference='ref1')) is None - assert process_ses_response(ses_notification_callback(reference='ref2')) is None - assert process_ses_response(ses_notification_callback(reference='ref3')) is None - assert send_mock.called - - -def test_ses_callback_should_set_status_to_temporary_failure(client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - create_service_callback_api(service=notification.service, url="https://original_url.com") - assert get_notification_by_id(notification.id).status == 'sending' - assert process_ses_response(ses_soft_bounce_callback(reference='ref')) is None - assert get_notification_by_id(notification.id).status == 'temporary-failure' - assert send_mock.called - - def test_ses_callback_should_not_set_status_once_status_is_delivered(client, notify_db, notify_db_session, @@ -168,30 +33,6 @@ def test_ses_callback_should_not_set_status_once_status_is_delivered(client, assert get_notification_by_id(notification.id).status == 'delivered' -def test_ses_callback_should_set_status_to_permanent_failure(client, - notify_db, - notify_db_session, - sample_email_template, - mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_delivery_status_to_service.apply_async' - ) - notification = create_sample_notification( - notify_db, - notify_db_session, - template=sample_email_template, - reference='ref', - status='sending', - sent_at=datetime.utcnow() - ) - create_service_callback_api(service=sample_email_template.service, url="https://original_url.com") - - assert get_notification_by_id(notification.id).status == 'sending' - assert process_ses_response(ses_hard_bounce_callback(reference='ref')) is None - assert get_notification_by_id(notification.id).status == 'permanent-failure' - assert send_mock.called - - def test_process_ses_results_in_complaint(sample_email_template): notification = create_notification(template=sample_email_template, reference='ref1') handle_complaint(json.loads(ses_complaint_callback()['Message'])) @@ -220,28 +61,3 @@ def test_process_ses_results_in_complaint_save_complaint_with_null_complaint_typ assert len(complaints) == 1 assert complaints[0].notification_id == notification.id assert not complaints[0].complaint_type - - -def test_ses_callback_should_send_on_complaint_to_user_callback_api(sample_email_template, mocker): - send_mock = mocker.patch( - 'app.celery.service_callback_tasks.send_complaint_to_service.apply_async' - ) - create_service_callback_api( - service=sample_email_template.service, url="https://original_url.com", callback_type="complaint" - ) - - notification = create_notification(template=sample_email_template, reference='ref1') - response = ses_complaint_callback() - errors = process_ses_response(response) - assert errors is None - - assert send_mock.call_count == 1 - assert encryption.decrypt(send_mock.call_args[0][0][0]) == { - 'complaint_date': '2018-06-05T13:59:58.000000Z', - 'complaint_id': str(Complaint.query.one().id), - 'notification_id': str(notification.id), - 'reference': None, - 'service_callback_api_bearer_token': 'some_super_secret', - 'service_callback_api_url': 'https://original_url.com', - 'to': 'recipient1@example.com' - }