Merge branch 'main' into notify-300b

This commit is contained in:
stvnrlly
2023-08-31 10:28:44 -04:00
706 changed files with 40498 additions and 30898 deletions

View File

@@ -1,11 +1,11 @@
from datetime import datetime, timedelta
from flask import current_app
from notifications_utils.timezones import convert_utc_to_local_timezone
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery
from app.aws import s3
from app.aws.s3 import remove_csv_object
from app.celery.process_ses_receipts_tasks import check_and_queue_callback_task
from app.config import QueueNames
from app.cronitor import cronitor
@@ -14,6 +14,7 @@ from app.dao.inbound_sms_dao import delete_inbound_sms_older_than_retention
from app.dao.jobs_dao import (
dao_archive_job,
dao_get_jobs_older_than_data_retention,
dao_get_unfinished_jobs,
)
from app.dao.notifications_dao import (
dao_get_notifications_processing_time_stats,
@@ -25,7 +26,7 @@ from app.dao.service_data_retention_dao import (
fetch_service_data_retention_for_all_services_by_notification_type,
)
from app.models import EMAIL_TYPE, SMS_TYPE, FactProcessingTime
from app.utils import get_local_midnight_in_utc
from app.utils import get_midnight_in_utc
@notify_celery.task(name="remove_sms_email_jobs")
@@ -42,68 +43,98 @@ def _remove_csv_files(job_types):
current_app.logger.info("Job ID {} has been removed from s3.".format(job.id))
@notify_celery.task(name="cleanup-unfinished-jobs")
def cleanup_unfinished_jobs():
now = datetime.utcnow()
jobs = dao_get_unfinished_jobs()
for job in jobs:
# The query already checks that the processing_finished time is null, so here we are saying
# if it started more than 4 hours ago, that's too long
acceptable_finish_time = job.processing_started + timedelta(minutes=5)
if now > acceptable_finish_time:
remove_csv_object(job.original_file_name)
dao_archive_job(job)
@notify_celery.task(name="delete-notifications-older-than-retention")
def delete_notifications_older_than_retention():
delete_email_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
delete_sms_notifications_older_than_retention.apply_async(queue=QueueNames.REPORTING)
delete_email_notifications_older_than_retention.apply_async(
queue=QueueNames.REPORTING
)
delete_sms_notifications_older_than_retention.apply_async(
queue=QueueNames.REPORTING
)
@notify_celery.task(name="delete-sms-notifications")
@cronitor("delete-sms-notifications")
def delete_sms_notifications_older_than_retention():
_delete_notifications_older_than_retention_by_type('sms')
_delete_notifications_older_than_retention_by_type("sms")
@notify_celery.task(name="delete-email-notifications")
@cronitor("delete-email-notifications")
def delete_email_notifications_older_than_retention():
_delete_notifications_older_than_retention_by_type('email')
_delete_notifications_older_than_retention_by_type("email")
def _delete_notifications_older_than_retention_by_type(notification_type):
flexible_data_retention = fetch_service_data_retention_for_all_services_by_notification_type(notification_type)
flexible_data_retention = (
fetch_service_data_retention_for_all_services_by_notification_type(
notification_type
)
)
for f in flexible_data_retention:
day_to_delete_backwards_from = get_local_midnight_in_utc(
convert_utc_to_local_timezone(datetime.utcnow()).date() - timedelta(days=f.days_of_retention)
day_to_delete_backwards_from = get_midnight_in_utc(
datetime.utcnow()
).date() - timedelta(days=f.days_of_retention)
delete_notifications_for_service_and_type.apply_async(
queue=QueueNames.REPORTING,
kwargs={
"service_id": f.service_id,
"notification_type": notification_type,
"datetime_to_delete_before": day_to_delete_backwards_from,
},
)
delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={
'service_id': f.service_id,
'notification_type': notification_type,
'datetime_to_delete_before': day_to_delete_backwards_from
})
seven_days_ago = get_midnight_in_utc(datetime.utcnow()).date() - timedelta(days=7)
seven_days_ago = get_local_midnight_in_utc(
convert_utc_to_local_timezone(datetime.utcnow()).date() - timedelta(days=7)
)
service_ids_with_data_retention = {x.service_id for x in flexible_data_retention}
# get a list of all service ids that we'll need to delete for. Typically that might only be 5% of services.
# This query takes a couple of mins to run.
service_ids_that_have_sent_notifications_recently = get_service_ids_with_notifications_before(
notification_type,
seven_days_ago
service_ids_that_have_sent_notifications_recently = (
get_service_ids_with_notifications_before(notification_type, seven_days_ago)
)
service_ids_to_purge = service_ids_that_have_sent_notifications_recently - service_ids_with_data_retention
service_ids_to_purge = (
service_ids_that_have_sent_notifications_recently
- service_ids_with_data_retention
)
for service_id in service_ids_to_purge:
delete_notifications_for_service_and_type.apply_async(queue=QueueNames.REPORTING, kwargs={
'service_id': service_id,
'notification_type': notification_type,
'datetime_to_delete_before': seven_days_ago
})
delete_notifications_for_service_and_type.apply_async(
queue=QueueNames.REPORTING,
kwargs={
"service_id": service_id,
"notification_type": notification_type,
"datetime_to_delete_before": seven_days_ago,
},
)
current_app.logger.info(
f'delete-notifications-older-than-retention: triggered subtasks for notification_type {notification_type}: '
f'{len(service_ids_with_data_retention)} services with flexible data retention, '
f'{len(service_ids_to_purge)} services without flexible data retention'
f"delete-notifications-older-than-retention: triggered subtasks for notification_type {notification_type}: "
f"{len(service_ids_with_data_retention)} services with flexible data retention, "
f"{len(service_ids_to_purge)} services without flexible data retention"
)
@notify_celery.task(name='delete-notifications-for-service-and-type')
def delete_notifications_for_service_and_type(service_id, notification_type, datetime_to_delete_before):
@notify_celery.task(name="delete-notifications-for-service-and-type")
def delete_notifications_for_service_and_type(
service_id, notification_type, datetime_to_delete_before
):
start = datetime.utcnow()
num_deleted = move_notifications_to_notification_history(
notification_type,
@@ -113,21 +144,21 @@ def delete_notifications_for_service_and_type(service_id, notification_type, dat
if num_deleted:
end = datetime.utcnow()
current_app.logger.info(
f'delete-notifications-for-service-and-type: '
f'service: {service_id}, '
f'notification_type: {notification_type}, '
f'count deleted: {num_deleted}, '
f'duration: {(end - start).seconds} seconds'
f"delete-notifications-for-service-and-type: "
f"service: {service_id}, "
f"notification_type: {notification_type}, "
f"count deleted: {num_deleted}, "
f"duration: {(end - start).seconds} seconds"
)
@notify_celery.task(name='timeout-sending-notifications')
@cronitor('timeout-sending-notifications')
@notify_celery.task(name="timeout-sending-notifications")
@cronitor("timeout-sending-notifications")
def timeout_notifications():
notifications = ['dummy value so len() > 0']
notifications = ["dummy value so len() > 0"]
cutoff_time = datetime.utcnow() - timedelta(
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
seconds=current_app.config.get("SENDING_NOTIFICATIONS_TIMEOUT_PERIOD")
)
while len(notifications) > 0:
@@ -137,7 +168,10 @@ def timeout_notifications():
check_and_queue_callback_task(notification)
current_app.logger.info(
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
"Timeout period reached for {} notifications, status has been updated.".format(
len(notifications)
)
)
@notify_celery.task(name="delete-inbound-sms")
@@ -148,9 +182,7 @@ def delete_inbound_sms():
deleted = delete_inbound_sms_older_than_retention()
current_app.logger.info(
"Delete inbound sms job started {} finished {} deleted {} inbound sms notifications".format(
start,
datetime.utcnow(),
deleted
start, datetime.utcnow(), deleted
)
)
except SQLAlchemyError:
@@ -158,7 +190,7 @@ def delete_inbound_sms():
raise
@notify_celery.task(name='save-daily-notification-processing-time')
@notify_celery.task(name="save-daily-notification-processing-time")
@cronitor("save-daily-notification-processing-time")
def save_daily_notification_processing_time(local_date=None):
# local_date is a string in the format of "YYYY-MM-DD"
@@ -168,13 +200,13 @@ def save_daily_notification_processing_time(local_date=None):
else:
local_date = datetime.strptime(local_date, "%Y-%m-%d").date()
start_time = get_local_midnight_in_utc(local_date)
end_time = get_local_midnight_in_utc(local_date + timedelta(days=1))
start_time = get_midnight_in_utc(local_date)
end_time = get_midnight_in_utc(local_date + timedelta(days=1))
result = dao_get_notifications_processing_time_stats(start_time, end_time)
insert_update_processing_time(
FactProcessingTime(
local_date=local_date,
messages_total=result.messages_total,
messages_within_10_secs=result.messages_within_10_secs
messages_within_10_secs=result.messages_within_10_secs,
)
)

View File

@@ -23,7 +23,9 @@ from app.dao.service_callback_api_dao import (
from app.models import NOTIFICATION_PENDING, NOTIFICATION_SENDING, Complaint
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
@notify_celery.task(
bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300
)
def process_ses_results(self, response):
try:
ses_message = json.loads(response["Message"])
@@ -35,9 +37,9 @@ def process_ses_results(self, response):
)
bounce_message = None
if notification_type == 'Bounce':
if notification_type == "Bounce":
bounce_message = determine_notification_bounce_type(ses_message)
elif notification_type == 'Complaint':
elif notification_type == "Complaint":
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
return True
@@ -47,9 +49,13 @@ def process_ses_results(self, response):
reference = ses_message["mail"]["messageId"]
try:
notification = notifications_dao.dao_get_notification_by_reference(reference)
notification = notifications_dao.dao_get_notification_by_reference(
reference
)
except NoResultFound:
message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None)
message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(
tzinfo=None
)
if datetime.utcnow() - message_time < timedelta(minutes=5):
current_app.logger.info(
f"Notification not found for reference: {reference}"
@@ -66,12 +72,13 @@ def process_ses_results(self, response):
return
if bounce_message:
current_app.logger.info(f"SES bounce for notification ID {notification.id}: {bounce_message}")
current_app.logger.info(
f"SES bounce for notification ID {notification.id}: {bounce_message}"
)
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
notifications_dao._duplicate_update_warning(
notification,
notification_status
notification, notification_status
)
return
@@ -89,7 +96,9 @@ def process_ses_results(self, response):
)
else:
current_app.logger.info(
"SES callback return status of {} for notification: {}".format(notification_status, notification.id)
"SES callback return status of {} for notification: {}".format(
notification_status, notification.id
)
)
check_and_queue_callback_task(notification)
@@ -113,7 +122,11 @@ def determine_notification_bounce_type(ses_message):
raise KeyError(f"Unhandled sns notification type {notification_type}")
remove_emails_from_bounce(ses_message)
current_app.logger.info("SES bounce dict: {}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")")))
current_app.logger.info(
"SES bounce dict: {}".format(
json.dumps(ses_message).replace("{", "(").replace("}", ")")
)
)
if ses_message["bounce"]["bounceType"] == "Permanent":
return "Permanent"
return "Temporary"
@@ -121,9 +134,9 @@ def determine_notification_bounce_type(ses_message):
def determine_notification_type(ses_message):
notification_type = ses_message["notificationType"]
if notification_type not in ["Bounce", "Complaint", "Delivery"]:
if notification_type not in ["Bounce", "Complaint", "Delivery"]:
raise KeyError(f"Unhandled sns notification type {notification_type}")
if notification_type == 'Bounce':
if notification_type == "Bounce":
return determine_notification_bounce_type(ses_message)
return notification_type
@@ -180,12 +193,16 @@ def get_aws_responses(ses_message):
def handle_complaint(ses_message):
recipient_email = remove_emails_from_complaint(ses_message)[0]
current_app.logger.info(
"Complaint from SES: \n{}".format(json.dumps(ses_message).replace("{", "(").replace("}", ")"))
"Complaint from SES: \n{}".format(
json.dumps(ses_message).replace("{", "(").replace("}", ")")
)
)
try:
reference = ses_message["mail"]["messageId"]
except KeyError as e:
current_app.logger.exception(f"Complaint from SES failed to get reference from message with error: {e}")
current_app.logger.exception(
f"Complaint from SES failed to get reference from message with error: {e}"
)
return
notification = dao_get_notification_history_by_reference(reference)
ses_complaint = ses_message.get("complaint", None)
@@ -193,8 +210,12 @@ def handle_complaint(ses_message):
complaint = Complaint(
notification_id=notification.id,
service_id=notification.service_id,
ses_feedback_id=ses_complaint.get("feedbackId", None) if ses_complaint else None,
complaint_type=ses_complaint.get("complaintFeedbackType", None) if ses_complaint else None,
ses_feedback_id=ses_complaint.get("feedbackId", None)
if ses_complaint
else None,
complaint_type=ses_complaint.get("complaintFeedbackType", None)
if ses_complaint
else None,
complaint_date=ses_complaint.get("timestamp", None) if ses_complaint else None,
)
save_complaint(complaint)
@@ -222,9 +243,13 @@ def remove_emails_from_complaint(complaint_dict):
def check_and_queue_callback_task(notification):
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
service_callback_api = get_service_delivery_status_callback_api_for_service(
service_id=notification.service_id
)
if service_callback_api:
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
notification_data = create_delivery_status_callback_data(
notification, service_callback_api
)
send_delivery_status_to_service.apply_async(
[str(notification.id), notification_data], queue=QueueNames.CALLBACKS
)
@@ -232,7 +257,13 @@ def check_and_queue_callback_task(notification):
def _check_and_queue_complaint_callback_task(complaint, notification, recipient):
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id)
service_callback_api = get_service_complaint_callback_api_for_service(
service_id=notification.service_id
)
if service_callback_api:
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
complaint_data = create_complaint_callback_data(
complaint, notification, service_callback_api, recipient
)
send_complaint_to_service.apply_async(
[complaint_data], queue=QueueNames.CALLBACKS
)

View File

@@ -1,6 +1,5 @@
from datetime import datetime, timedelta
from time import time
from zoneinfo import ZoneInfo
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound
@@ -11,17 +10,25 @@ from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
from app.clients.sms import SmsClientResponseException
from app.config import QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import update_notification_status_by_id
from app.dao.notifications_dao import (
sanitize_successful_notification_by_id,
update_notification_status_by_id,
)
from app.delivery import send_to_providers
from app.exceptions import NotificationTechnicalFailureException
from app.models import (
NOTIFICATION_DELIVERED,
NOTIFICATION_FAILED,
NOTIFICATION_SENT,
NOTIFICATION_TECHNICAL_FAILURE,
)
@notify_celery.task(bind=True, name="check_sms_delivery_receipt", max_retries=48, default_retry_delay=300)
@notify_celery.task(
bind=True,
name="check_sms_delivery_receipt",
max_retries=48,
default_retry_delay=300,
)
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
This is called after deliver_sms to check the status of the message. This uses the same number of
@@ -32,38 +39,54 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
failure appears in the cloudwatch logs, so this should keep retrying until the log appears, or until
we run out of retries.
"""
status, provider_response = aws_cloudwatch_client.check_sms(message_id, notification_id, sent_at)
if status == 'success':
status = NOTIFICATION_SENT
else:
status, provider_response = aws_cloudwatch_client.check_sms(
message_id, notification_id, sent_at
)
if status == "success":
status = NOTIFICATION_DELIVERED
elif status == "failure":
status = NOTIFICATION_FAILED
update_notification_status_by_id(notification_id, status, provider_response=provider_response)
current_app.logger.info(f"Updated notification {notification_id} with response '{provider_response}'")
# if status is not success or failure the client raised an exception and this method will retry
if status == NOTIFICATION_DELIVERED:
sanitize_successful_notification_by_id(notification_id)
current_app.logger.info(
f"Sanitized notification {notification_id} that was successfully delivered"
)
else:
update_notification_status_by_id(
notification_id, status, provider_response=provider_response
)
current_app.logger.info(
f"Updated notification {notification_id} with response '{provider_response}'"
)
@notify_celery.task(bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300)
@notify_celery.task(
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
)
def deliver_sms(self, notification_id):
try:
# Get the time we are doing the sending, to minimize the time period we need to check over for receipt
now = round(time() * 1000)
current_app.logger.info("Start sending SMS for notification id: {}".format(notification_id))
current_app.logger.info(
"Start sending SMS for notification id: {}".format(notification_id)
)
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
message_id = send_to_providers.send_sms_to_provider(notification)
# We have to put it in the default US/Eastern timezone. From zones west of there, the delay
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)
my_eta = datetime.now(ZoneInfo('US/Eastern')) + timedelta(seconds=300)
my_eta = datetime.utcnow() + timedelta(seconds=300)
check_sms_delivery_receipt.apply_async(
[message_id, notification_id, now],
eta=my_eta,
queue=QueueNames.CHECK_SMS
[message_id, notification_id, now], eta=my_eta, queue=QueueNames.CHECK_SMS
)
except Exception as e:
if isinstance(e, SmsClientResponseException):
current_app.logger.warning(
"SMS notification delivery for id: {} failed".format(notification_id),
exc_info=True
exc_info=True,
)
else:
current_app.logger.exception(
@@ -76,16 +99,26 @@ def deliver_sms(self, notification_id):
else:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = "RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. " \
"Notification has been updated to technical-failure".format(notification_id)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
)
update_notification_status_by_id(
notification_id, NOTIFICATION_TECHNICAL_FAILURE
)
raise NotificationTechnicalFailureException(message)
@notify_celery.task(bind=True, name="deliver_email", max_retries=48, default_retry_delay=300)
@notify_celery.task(
bind=True, name="deliver_email", max_retries=48, default_retry_delay=300
)
def deliver_email(self, notification_id):
try:
current_app.logger.info("Start sending email for notification id: {}".format(notification_id))
current_app.logger.info(
"Start sending email for notification id: {}".format(notification_id)
)
notification = notifications_dao.get_notification_by_id(notification_id)
if not notification:
raise NoResultFound()
@@ -94,7 +127,7 @@ def deliver_email(self, notification_id):
current_app.logger.exception(
f"Email notification {notification_id} failed: {e}"
)
update_notification_status_by_id(notification_id, 'technical-failure')
update_notification_status_by_id(notification_id, "technical-failure")
except Exception as e:
try:
if isinstance(e, AwsSesClientThrottlingSendRateException):
@@ -108,8 +141,14 @@ def deliver_email(self, notification_id):
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = "RETRY FAILED: Max retries reached. " \
"The task send_email_to_provider failed for notification {}. " \
"Notification has been updated to technical-failure".format(notification_id)
update_notification_status_by_id(notification_id, NOTIFICATION_TECHNICAL_FAILURE)
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
)
update_notification_status_by_id(
notification_id, NOTIFICATION_TECHNICAL_FAILURE
)
raise NotificationTechnicalFailureException(message)

View File

@@ -1,15 +1,11 @@
from datetime import datetime, timedelta
from flask import current_app
from notifications_utils.timezones import convert_utc_to_local_timezone
from app import notify_celery
from app.config import QueueNames
from app.cronitor import cronitor
from app.dao.fact_billing_dao import (
fetch_billing_data_for_day,
update_fact_billing,
)
from app.dao.fact_billing_dao import fetch_billing_data_for_day, update_fact_billing
from app.dao.fact_notification_status_dao import update_fact_notification_status
from app.dao.notifications_dao import get_service_ids_with_notifications_on_date
from app.models import EMAIL_TYPE, SMS_TYPE
@@ -21,7 +17,7 @@ def create_nightly_billing(day_start=None):
# day_start is a datetime.date() object. e.g.
# up to 4 days of data counting back from day_start is consolidated
if day_start is None:
day_start = convert_utc_to_local_timezone(datetime.utcnow()).date() - timedelta(days=1)
day_start = datetime.utcnow().date() - timedelta(days=1)
else:
# When calling the task its a string in the format of "YYYY-MM-DD"
day_start = datetime.strptime(day_start, "%Y-%m-%d").date()
@@ -29,8 +25,7 @@ def create_nightly_billing(day_start=None):
process_day = (day_start - timedelta(days=i)).isoformat()
create_nightly_billing_for_day.apply_async(
kwargs={'process_day': process_day},
queue=QueueNames.REPORTING
kwargs={"process_day": process_day}, queue=QueueNames.REPORTING
)
current_app.logger.info(
f"create-nightly-billing task: create-nightly-billing-for-day task created for {process_day}"
@@ -41,7 +36,7 @@ def create_nightly_billing(day_start=None):
def create_nightly_billing_for_day(process_day):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
current_app.logger.info(
f'create-nightly-billing-for-day task for {process_day}: started'
f"create-nightly-billing-for-day task for {process_day}: started"
)
start = datetime.utcnow()
@@ -49,7 +44,7 @@ def create_nightly_billing_for_day(process_day):
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-billing-for-day task for {process_day}: data fetched in {(end - start).seconds} seconds'
f"create-nightly-billing-for-day task for {process_day}: data fetched in {(end - start).seconds} seconds"
)
for data in transit_data:
@@ -83,7 +78,7 @@ def create_nightly_notification_status():
mean the aggregated results are temporarily incorrect.
"""
yesterday = convert_utc_to_local_timezone(datetime.utcnow()).date() - timedelta(days=1)
yesterday = datetime.utcnow().date() - timedelta(days=1)
for notification_type in [SMS_TYPE, EMAIL_TYPE]:
days = 4
@@ -98,28 +93,30 @@ def create_nightly_notification_status():
for service_id in relevant_service_ids:
create_nightly_notification_status_for_service_and_day.apply_async(
kwargs={
'process_day': process_day.isoformat(),
'notification_type': notification_type,
'service_id': service_id,
"process_day": process_day.isoformat(),
"notification_type": notification_type,
"service_id": service_id,
},
queue=QueueNames.REPORTING
queue=QueueNames.REPORTING,
)
@notify_celery.task(name="create-nightly-notification-status-for-service-and-day")
def create_nightly_notification_status_for_service_and_day(process_day, service_id, notification_type):
def create_nightly_notification_status_for_service_and_day(
process_day, service_id, notification_type
):
process_day = datetime.strptime(process_day, "%Y-%m-%d").date()
start = datetime.utcnow()
update_fact_notification_status(
process_day=process_day,
notification_type=notification_type,
service_id=service_id
service_id=service_id,
)
end = datetime.utcnow()
current_app.logger.info(
f'create-nightly-notification-status-for-service-and-day task update '
f'for {service_id}, {notification_type} for {process_day}: '
f'updated in {(end - start).seconds} seconds'
f"create-nightly-notification-status-for-service-and-day task update "
f"for {service_id}, {notification_type} for {process_day}: "
f"updated in {(end - start).seconds} seconds"
)

View File

@@ -5,6 +5,7 @@ from requests import HTTPError, request
from app.celery.process_ses_receipts_tasks import process_ses_results
from app.config import QueueNames
from app.dao.notifications_dao import get_notification_by_id
from app.models import SMS_TYPE
temp_fail = "2028675303"
@@ -16,8 +17,8 @@ perm_fail_email = "perm-fail@simulator.notify"
temp_fail_email = "temp-fail@simulator.notify"
def send_sms_response(provider, reference, to):
body = sns_callback(reference, to)
def send_sms_response(provider, reference):
body = sns_callback(reference)
headers = {"Content-type": "application/json"}
make_request(SMS_TYPE, provider, body, headers)
@@ -35,22 +36,17 @@ def send_email_response(reference, to):
def make_request(notification_type, provider, data, headers):
api_call = "{}/notifications/{}/{}".format(current_app.config["API_HOST_NAME"], notification_type, provider)
api_call = "{}/notifications/{}/{}".format(
current_app.config["API_HOST_NAME"], notification_type, provider
)
try:
response = request(
"POST",
api_call,
headers=headers,
data=data,
timeout=60
)
response = request("POST", api_call, headers=headers, data=data, timeout=60)
response.raise_for_status()
except HTTPError as e:
current_app.logger.error(
"API POST request on {} failed with status {}".format(
api_call,
e.response.status_code
api_call, e.response.status_code
)
)
raise e
@@ -59,165 +55,136 @@ def make_request(notification_type, provider, data, headers):
return response.json()
def sns_callback(notification_id, to):
raise Exception("Need to update for SNS callback format along with test_send_to_providers")
def sns_callback(notification_id):
notification = get_notification_by_id(notification_id)
# example from mmg_callback
# if to.strip().endswith(temp_fail):
# # status: 4 - expired (temp failure)
# status = "4"
# elif to.strip().endswith(perm_fail):
# # status: 5 - rejected (perm failure)
# status = "5"
# else:
# # status: 3 - delivered
# status = "3"
# return json.dumps({"reference": "mmg_reference",
# "CID": str(notification_id),
# "MSISDN": to,
# "status": status,
# "deliverytime": "2016-04-05 16:01:07"})
# This will only work if all notifications, including successful ones, are in the notifications table
# If we decide to delete successful notifications, we will have to get this from notifications history
return json.dumps(
{
"CID": str(notification_id),
"status": notification.status,
# "deliverytime": notification.completed_at
}
)
def ses_notification_callback(reference):
ses_message_body = {
'delivery': {
'processingTimeMillis': 2003,
'recipients': ['success@simulator.amazonses.com'],
'remoteMtaIp': '123.123.123.123',
'reportingMTA': 'a7-32.smtp-out.us-west-2.amazonses.com',
'smtpResponse': '250 2.6.0 Message received',
'timestamp': '2017-11-17T12:14:03.646Z'
"delivery": {
"processingTimeMillis": 2003,
"recipients": ["success@simulator.amazonses.com"],
"remoteMtaIp": "123.123.123.123",
"reportingMTA": "a7-32.smtp-out.us-west-2.amazonses.com",
"smtpResponse": "250 2.6.0 Message received",
"timestamp": "2017-11-17T12:14:03.646Z",
},
'mail': {
'commonHeaders': {
'from': ['TEST <TEST@notify.works>'],
'subject': 'lambda test',
'to': ['success@simulator.amazonses.com']
"mail": {
"commonHeaders": {
"from": ["TEST <TEST@notify.works>"],
"subject": "lambda test",
"to": ["success@simulator.amazonses.com"],
},
'destination': ['success@simulator.amazonses.com'],
'headers': [
"destination": ["success@simulator.amazonses.com"],
"headers": [
{"name": "From", "value": "TEST <TEST@notify.works>"},
{"name": "To", "value": "success@simulator.amazonses.com"},
{"name": "Subject", "value": "lambda test"},
{"name": "MIME-Version", "value": "1.0"},
{
'name': 'From',
'value': 'TEST <TEST@notify.works>'
"name": "Content-Type",
"value": 'multipart/alternative; boundary="----=_Part_617203_1627511946.1510920841645"',
},
{
'name': 'To',
'value': 'success@simulator.amazonses.com'
},
{
'name': 'Subject',
'value': 'lambda test'
},
{
'name': 'MIME-Version',
'value': '1.0'
},
{
'name': 'Content-Type',
'value': 'multipart/alternative; boundary="----=_Part_617203_1627511946.1510920841645"'
}
],
'headersTruncated': False,
'messageId': reference,
'sendingAccountId': '12341234',
'source': '"TEST" <TEST@notify.works>',
'sourceArn': 'arn:aws:ses:us-west-2:12341234:identity/notify.works',
'sourceIp': '0.0.0.1',
'timestamp': '2017-11-17T12:14:01.643Z'
"headersTruncated": False,
"messageId": reference,
"sendingAccountId": "12341234",
"source": '"TEST" <TEST@notify.works>',
"sourceArn": "arn:aws:ses:us-west-2:12341234:identity/notify.works",
"sourceIp": "0.0.0.1",
"timestamp": "2017-11-17T12:14:01.643Z",
},
'notificationType': 'Delivery'
"notificationType": "Delivery",
}
return {
'Type': 'Notification',
'MessageId': '8e83c020-1234-1234-1234-92a8ee9baa0a',
'TopicArn': 'arn:aws:sns:us-west-2:12341234:ses_notifications',
'Subject': None,
'Message': json.dumps(ses_message_body),
'Timestamp': '2017-11-17T12:14:03.710Z',
'SignatureVersion': '1',
'Signature': '[REDACTED]',
'SigningCertUrl': 'https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED].pem',
'UnsubscribeUrl': 'https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]',
'MessageAttributes': {}
"Type": "Notification",
"MessageId": "8e83c020-1234-1234-1234-92a8ee9baa0a",
"TopicArn": "arn:aws:sns:us-west-2:12341234:ses_notifications",
"Subject": None,
"Message": json.dumps(ses_message_body),
"Timestamp": "2017-11-17T12:14:03.710Z",
"SignatureVersion": "1",
"Signature": "[REDACTED]",
"SigningCertUrl": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED].pem",
"UnsubscribeUrl": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]",
"MessageAttributes": {},
}
def ses_hard_bounce_callback(reference):
return _ses_bounce_callback(reference, 'Permanent')
return _ses_bounce_callback(reference, "Permanent")
def ses_soft_bounce_callback(reference):
return _ses_bounce_callback(reference, 'Temporary')
return _ses_bounce_callback(reference, "Temporary")
def _ses_bounce_callback(reference, bounce_type):
ses_message_body = {
'bounce': {
'bounceSubType': 'General',
'bounceType': bounce_type,
'bouncedRecipients': [{
'action': 'failed',
'diagnosticCode': 'smtp; 550 5.1.1 user unknown',
'emailAddress': 'bounce@simulator.amazonses.com',
'status': '5.1.1'
}],
'feedbackId': '0102015fc9e676fb-12341234-1234-1234-1234-9301e86a4fa8-000000',
'remoteMtaIp': '123.123.123.123',
'reportingMTA': 'dsn; a7-31.smtp-out.us-west-2.amazonses.com',
'timestamp': '2017-11-17T12:14:05.131Z'
},
'mail': {
'commonHeaders': {
'from': ['TEST <TEST@notify.works>'],
'subject': 'ses callback test',
'to': ['bounce@simulator.amazonses.com']
},
'destination': ['bounce@simulator.amazonses.com'],
'headers': [
"bounce": {
"bounceSubType": "General",
"bounceType": bounce_type,
"bouncedRecipients": [
{
'name': 'From',
'value': 'TEST <TEST@notify.works>'
},
{
'name': 'To',
'value': 'bounce@simulator.amazonses.com'
},
{
'name': 'Subject',
'value': 'lambda test'
},
{
'name': 'MIME-Version',
'value': '1.0'
},
{
'name': 'Content-Type',
'value': 'multipart/alternative; boundary="----=_Part_596529_2039165601.1510920843367"'
"action": "failed",
"diagnosticCode": "smtp; 550 5.1.1 user unknown",
"emailAddress": "bounce@simulator.amazonses.com",
"status": "5.1.1",
}
],
'headersTruncated': False,
'messageId': reference,
'sendingAccountId': '12341234',
'source': '"TEST" <TEST@notify.works>',
'sourceArn': 'arn:aws:ses:us-west-2:12341234:identity/notify.works',
'sourceIp': '0.0.0.1',
'timestamp': '2017-11-17T12:14:03.000Z'
"feedbackId": "0102015fc9e676fb-12341234-1234-1234-1234-9301e86a4fa8-000000",
"remoteMtaIp": "123.123.123.123",
"reportingMTA": "dsn; a7-31.smtp-out.us-west-2.amazonses.com",
"timestamp": "2017-11-17T12:14:05.131Z",
},
'notificationType': 'Bounce'
"mail": {
"commonHeaders": {
"from": ["TEST <TEST@notify.works>"],
"subject": "ses callback test",
"to": ["bounce@simulator.amazonses.com"],
},
"destination": ["bounce@simulator.amazonses.com"],
"headers": [
{"name": "From", "value": "TEST <TEST@notify.works>"},
{"name": "To", "value": "bounce@simulator.amazonses.com"},
{"name": "Subject", "value": "lambda test"},
{"name": "MIME-Version", "value": "1.0"},
{
"name": "Content-Type",
"value": 'multipart/alternative; boundary="----=_Part_596529_2039165601.1510920843367"',
},
],
"headersTruncated": False,
"messageId": reference,
"sendingAccountId": "12341234",
"source": '"TEST" <TEST@notify.works>',
"sourceArn": "arn:aws:ses:us-west-2:12341234:identity/notify.works",
"sourceIp": "0.0.0.1",
"timestamp": "2017-11-17T12:14:03.000Z",
},
"notificationType": "Bounce",
}
return {
'Type': 'Notification',
'MessageId': '36e67c28-1234-1234-1234-2ea0172aa4a7',
'TopicArn': 'arn:aws:sns:us-west-2:12341234:ses_notifications',
'Subject': None,
'Message': json.dumps(ses_message_body),
'Timestamp': '2017-11-17T12:14:05.149Z',
'SignatureVersion': '1',
'Signature': '[REDACTED]', # noqa
'SigningCertUrl': 'https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED]].pem',
'UnsubscribeUrl': 'https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REDACTED]]',
'MessageAttributes': {}
"Type": "Notification",
"MessageId": "36e67c28-1234-1234-1234-2ea0172aa4a7",
"TopicArn": "arn:aws:sns:us-west-2:12341234:ses_notifications",
"Subject": None,
"Message": json.dumps(ses_message_body),
"Timestamp": "2017-11-17T12:14:05.149Z",
"SignatureVersion": "1",
"Signature": "[REDACTED]", # noqa
"SigningCertUrl": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED]].pem",
"UnsubscribeUrl": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REDACTED]]",
"MessageAttributes": {},
}

View File

@@ -1,9 +1,7 @@
from datetime import datetime, timedelta
from flask import current_app
from notifications_utils.clients.zendesk.zendesk_client import (
NotifySupportTicket,
)
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError
@@ -18,9 +16,7 @@ from app.config import QueueNames
from app.dao.invited_org_user_dao import (
delete_org_invitations_created_more_than_two_days_ago,
)
from app.dao.invited_user_dao import (
delete_invitations_created_more_than_two_days_ago,
)
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
from app.dao.jobs_dao import (
dao_set_scheduled_jobs_to_pending,
dao_update_job,
@@ -28,9 +24,6 @@ from app.dao.jobs_dao import (
find_missing_row_for_job,
)
from app.dao.notifications_dao import notifications_not_yet_sent
from app.dao.provider_details_dao import (
dao_adjust_provider_priority_back_to_resting_points,
)
from app.dao.services_dao import (
dao_find_services_sending_to_tv_numbers,
dao_find_services_with_high_failure_rates,
@@ -52,7 +45,9 @@ def run_scheduled_jobs():
try:
for job in dao_set_scheduled_jobs_to_pending():
process_job.apply_async([str(job.id)], queue=QueueNames.JOBS)
current_app.logger.info("Job ID {} added to process job queue".format(job.id))
current_app.logger.info(
"Job ID {} added to process job queue".format(job.id)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to run scheduled jobs")
raise
@@ -64,7 +59,9 @@ def delete_verify_codes():
start = datetime.utcnow()
deleted = delete_codes_older_created_more_than_a_day_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} verify codes".format(start, datetime.utcnow(), deleted)
"Delete job started {} finished {} deleted {} verify codes".format(
start, datetime.utcnow(), deleted
)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to delete verify codes")
@@ -78,19 +75,16 @@ def delete_invitations():
deleted_invites = delete_invitations_created_more_than_two_days_ago()
deleted_invites += delete_org_invitations_created_more_than_two_days_ago()
current_app.logger.info(
"Delete job started {} finished {} deleted {} invitations".format(start, datetime.utcnow(), deleted_invites)
"Delete job started {} finished {} deleted {} invitations".format(
start, datetime.utcnow(), deleted_invites
)
)
except SQLAlchemyError:
current_app.logger.exception("Failed to delete invitations")
raise
@notify_celery.task(name='tend-providers-back-to-middle')
def tend_providers_back_to_middle():
dao_adjust_provider_priority_back_to_resting_points()
@notify_celery.task(name='check-job-status')
@notify_celery.task(name="check-job-status")
def check_job_status():
"""
every x minutes do this check
@@ -109,19 +103,19 @@ def check_job_status():
incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago)
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
)
incomplete_pending_jobs = Job.query.filter(
Job.job_status == JOB_STATUS_PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago)
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
)
jobs_not_complete_after_30_minutes = incomplete_in_progress_jobs.union(
incomplete_pending_jobs
).order_by(
Job.processing_started, Job.scheduled_for
).all()
jobs_not_complete_after_30_minutes = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for)
.all()
)
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
@@ -133,52 +127,65 @@ def check_job_status():
if job_ids:
current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
process_incomplete_jobs.apply_async(
[job_ids],
queue=QueueNames.JOBS
)
process_incomplete_jobs.apply_async([job_ids], queue=QueueNames.JOBS)
@notify_celery.task(name='replay-created-notifications')
@notify_celery.task(name="replay-created-notifications")
def replay_created_notifications():
# if the notification has not be send after 1 hour, then try to resend.
resend_created_notifications_older_than = (60 * 60)
resend_created_notifications_older_than = 60 * 60
for notification_type in (EMAIL_TYPE, SMS_TYPE):
notifications_to_resend = notifications_not_yet_sent(
resend_created_notifications_older_than,
notification_type
resend_created_notifications_older_than, notification_type
)
if len(notifications_to_resend) > 0:
current_app.logger.info("Sending {} {} notifications "
"to the delivery queue because the notification "
"status was created.".format(len(notifications_to_resend), notification_type))
current_app.logger.info(
"Sending {} {} notifications "
"to the delivery queue because the notification "
"status was created.".format(
len(notifications_to_resend), notification_type
)
)
for n in notifications_to_resend:
send_notification_to_queue(notification=n, research_mode=n.service.research_mode)
send_notification_to_queue(notification=n)
@notify_celery.task(name='check-for-missing-rows-in-completed-jobs')
@notify_celery.task(name="check-for-missing-rows-in-completed-jobs")
def check_for_missing_rows_in_completed_jobs():
jobs = find_jobs_with_missing_rows()
for job in jobs:
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
(
recipient_csv,
template,
sender_id,
) = get_recipient_csv_and_template_and_sender_id(job)
missing_rows = find_missing_row_for_job(job.id, job.notification_count)
for row_to_process in missing_rows:
row = recipient_csv[row_to_process.missing_row]
current_app.logger.info(
"Processing missing row: {} for job: {}".format(row_to_process.missing_row, job.id))
"Processing missing row: {} for job: {}".format(
row_to_process.missing_row, job.id
)
)
process_row(row, template, job, job.service, sender_id=sender_id)
@notify_celery.task(name='check-for-services-with-high-failure-rates-or-sending-to-tv-numbers')
@notify_celery.task(
name="check-for-services-with-high-failure-rates-or-sending-to-tv-numbers"
)
def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
start_date = (datetime.utcnow() - timedelta(days=1))
start_date = datetime.utcnow() - timedelta(days=1)
end_date = datetime.utcnow()
message = ""
services_with_failures = dao_find_services_with_high_failure_rates(start_date=start_date, end_date=end_date)
services_sending_to_tv_numbers = dao_find_services_sending_to_tv_numbers(start_date=start_date, end_date=end_date)
services_with_failures = dao_find_services_with_high_failure_rates(
start_date=start_date, end_date=end_date
)
services_sending_to_tv_numbers = dao_find_services_sending_to_tv_numbers(
start_date=start_date, end_date=end_date
)
if services_with_failures:
message += "{} service(s) have had high permanent-failure rates for sms messages in last 24 hours:\n".format(
@@ -186,17 +193,19 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
)
for service in services_with_failures:
service_dashboard = "{}/services/{}".format(
current_app.config['ADMIN_BASE_URL'],
current_app.config["ADMIN_BASE_URL"],
str(service.service_id),
)
message += "service: {} failure rate: {},\n".format(service_dashboard, service.permanent_failure_rate)
message += "service: {} failure rate: {},\n".format(
service_dashboard, service.permanent_failure_rate
)
elif services_sending_to_tv_numbers:
message += "{} service(s) have sent over 500 sms messages to tv numbers in last 24 hours:\n".format(
len(services_sending_to_tv_numbers)
)
for service in services_sending_to_tv_numbers:
service_dashboard = "{}/services/{}".format(
current_app.config['ADMIN_BASE_URL'],
current_app.config["ADMIN_BASE_URL"],
str(service.service_id),
)
message += "service: {} count of sms to tv numbers: {},\n".format(
@@ -206,13 +215,15 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
if services_with_failures or services_sending_to_tv_numbers:
current_app.logger.warning(message)
if current_app.config['NOTIFY_ENVIRONMENT'] in ['live', 'production', 'test']:
message += ("\nYou can find instructions for this ticket in our manual:\n"
"https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#Deal-with-services-with-high-failure-rates-or-sending-sms-to-tv-numbers") # noqa
if current_app.config["NOTIFY_ENVIRONMENT"] in ["live", "production", "test"]:
message += (
"\nYou can find instructions for this ticket in our manual:\n"
"https://github.com/alphagov/notifications-manuals/wiki/Support-Runbook#Deal-with-services-with-high-failure-rates-or-sending-sms-to-tv-numbers" # noqa
)
ticket = NotifySupportTicket(
subject=f"[{current_app.config['NOTIFY_ENVIRONMENT']}] High failure rates for sms spotted for services",
message=message,
ticket_type=NotifySupportTicket.TYPE_INCIDENT,
technical_ticket=True
technical_ticket=True,
)
zendesk_client.send_ticket_to_zendesk(ticket)

View File

@@ -8,102 +8,106 @@ from app.config import QueueNames
from app.utils import DATETIME_FORMAT
@notify_celery.task(bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300)
def send_delivery_status_to_service(
self, notification_id, encrypted_status_update
):
@notify_celery.task(
bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300
)
def send_delivery_status_to_service(self, notification_id, encrypted_status_update):
status_update = encryption.decrypt(encrypted_status_update)
data = {
"id": str(notification_id),
"reference": status_update['notification_client_reference'],
"to": status_update['notification_to'],
"status": status_update['notification_status'],
"created_at": status_update['notification_created_at'],
"completed_at": status_update['notification_updated_at'],
"sent_at": status_update['notification_sent_at'],
"notification_type": status_update['notification_type'],
"template_id": status_update['template_id'],
"template_version": status_update['template_version']
"reference": status_update["notification_client_reference"],
"to": status_update["notification_to"],
"status": status_update["notification_status"],
"created_at": status_update["notification_created_at"],
"completed_at": status_update["notification_updated_at"],
"sent_at": status_update["notification_sent_at"],
"notification_type": status_update["notification_type"],
"template_id": status_update["template_id"],
"template_version": status_update["template_version"],
}
_send_data_to_service_callback_api(
self,
data,
status_update['service_callback_api_url'],
status_update['service_callback_api_bearer_token'],
'send_delivery_status_to_service'
status_update["service_callback_api_url"],
status_update["service_callback_api_bearer_token"],
"send_delivery_status_to_service",
)
@notify_celery.task(bind=True, name="send-complaint", max_retries=5, default_retry_delay=300)
@notify_celery.task(
bind=True, name="send-complaint", max_retries=5, default_retry_delay=300
)
def send_complaint_to_service(self, complaint_data):
complaint = encryption.decrypt(complaint_data)
data = {
"notification_id": complaint['notification_id'],
"complaint_id": complaint['complaint_id'],
"reference": complaint['reference'],
"to": complaint['to'],
"complaint_date": complaint['complaint_date']
"notification_id": complaint["notification_id"],
"complaint_id": complaint["complaint_id"],
"reference": complaint["reference"],
"to": complaint["to"],
"complaint_date": complaint["complaint_date"],
}
_send_data_to_service_callback_api(
self,
data,
complaint['service_callback_api_url'],
complaint['service_callback_api_bearer_token'],
'send_complaint_to_service'
complaint["service_callback_api_url"],
complaint["service_callback_api_bearer_token"],
"send_complaint_to_service",
)
def _send_data_to_service_callback_api(self, data, service_callback_url, token, function_name):
notification_id = (data["notification_id"] if "notification_id" in data else data["id"])
def _send_data_to_service_callback_api(
self, data, service_callback_url, token, function_name
):
notification_id = (
data["notification_id"] if "notification_id" in data else data["id"]
)
try:
response = request(
method="POST",
url=service_callback_url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(token)
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(token),
},
timeout=5
timeout=5,
)
current_app.logger.info(
"{} sending {} to {}, response {}".format(
function_name,
notification_id,
service_callback_url,
response.status_code,
)
)
current_app.logger.info('{} sending {} to {}, response {}'.format(
function_name,
notification_id,
service_callback_url,
response.status_code
))
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"{} request failed for notification_id: {} and url: {}. exception: {}".format(
function_name,
notification_id,
service_callback_url,
e
function_name, notification_id, service_callback_url, e
)
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500 or e.response.status_code == 429:
if (
not isinstance(e, HTTPError)
or e.response.status_code >= 500
or e.response.status_code == 429
):
try:
self.retry(queue=QueueNames.CALLBACKS_RETRY)
except self.MaxRetriesExceededError:
current_app.logger.warning(
"Retry: {} has retried the max num of times for callback url {} and notification_id: {}".format(
function_name,
service_callback_url,
notification_id
function_name, service_callback_url, notification_id
)
)
else:
current_app.logger.warning(
"{} callback is not being retried for notification_id: {} and url: {}. exception: {}".format(
function_name,
notification_id,
service_callback_url,
e
function_name, notification_id, service_callback_url, e
)
)
@@ -116,9 +120,12 @@ def create_delivery_status_callback_data(notification, service_callback_api):
"notification_status": notification.status,
"notification_provider_response": notification.provider_response, # TODO do we test for provider_response?
"notification_created_at": notification.created_at.strftime(DATETIME_FORMAT),
"notification_updated_at":
notification.updated_at.strftime(DATETIME_FORMAT) if notification.updated_at else None,
"notification_sent_at": notification.sent_at.strftime(DATETIME_FORMAT) if notification.sent_at else None,
"notification_updated_at": notification.updated_at.strftime(DATETIME_FORMAT)
if notification.updated_at
else None,
"notification_sent_at": notification.sent_at.strftime(DATETIME_FORMAT)
if notification.sent_at
else None,
"notification_type": notification.notification_type,
"service_callback_api_url": service_callback_api.url,
"service_callback_api_bearer_token": service_callback_api.bearer_token,
@@ -128,7 +135,9 @@ def create_delivery_status_callback_data(notification, service_callback_api):
return encryption.encrypt(data)
def create_complaint_callback_data(complaint, notification, service_callback_api, recipient):
def create_complaint_callback_data(
complaint, notification, service_callback_api, recipient
):
data = {
"complaint_id": str(complaint.id),
"notification_id": str(notification.id),

View File

@@ -30,21 +30,21 @@ from app.models import (
SMS_TYPE,
)
from app.notifications.process_notifications import persist_notification
from app.notifications.validators import (
check_service_over_daily_message_limit,
check_service_over_total_message_limit,
)
from app.notifications.validators import check_service_over_total_message_limit
from app.serialised_models import SerialisedService, SerialisedTemplate
from app.service.utils import service_allowed_to_send_to
from app.utils import DATETIME_FORMAT
from app.v2.errors import TooManyRequestsError
@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None):
start = datetime.utcnow()
job = dao_get_job_by_id(job_id)
current_app.logger.info("Starting process-job task for job id {} with status: {}".format(job_id, job.job_status))
current_app.logger.info(
"Starting process-job task for job id {} with status: {}".format(
job_id, job.job_status
)
)
if job.job_status != JOB_STATUS_PENDING:
return
@@ -59,18 +59,24 @@ def process_job(job_id, sender_id=None):
job.job_status = JOB_STATUS_CANCELLED
dao_update_job(job)
current_app.logger.warning(
"Job {} has been cancelled, service {} is inactive".format(job_id, service.id))
return
if __daily_sending_limits_for_job_exceeded(service, job, job_id):
"Job {} has been cancelled, service {} is inactive".format(
job_id, service.id
)
)
return
if __total_sending_limits_for_job_exceeded(service, job, job_id):
return
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(
job
)
current_app.logger.info("Starting job {} processing {} notifications".format(job_id, job.notification_count))
current_app.logger.info(
"Starting job {} processing {} notifications".format(
job_id, job.notification_count
)
)
for row in recipient_csv.get_rows():
process_row(row, template, job, service, sender_id=sender_id)
@@ -91,7 +97,9 @@ def job_complete(job, resumed=False, start=None):
)
else:
current_app.logger.info(
"Job {} created at {} started at {} finished at {}".format(job.id, job.created_at, start, finished)
"Job {} created at {} started at {} finished at {}".format(
job.id, job.created_at, start, finished
)
)
@@ -99,7 +107,9 @@ def get_recipient_csv_and_template_and_sender_id(job):
db_template = dao_get_template_by_id(job.template_id, job.template_version)
template = db_template._as_utils_template()
contents, meta_data = s3.get_job_and_metadata_from_s3(service_id=str(job.service_id), job_id=str(job.id))
contents, meta_data = s3.get_job_and_metadata_from_s3(
service_id=str(job.service_id), job_id=str(job.id)
)
recipient_csv = RecipientCSV(contents, template=template)
return recipient_csv, template, meta_data.get("sender_id")
@@ -107,25 +117,24 @@ def get_recipient_csv_and_template_and_sender_id(job):
def process_row(row, template, job, service, sender_id=None):
template_type = template.template_type
encrypted = encryption.encrypt({
'template': str(template.id),
'template_version': job.template_version,
'job': str(job.id),
'to': row.recipient,
'row_number': row.index,
'personalisation': dict(row.personalisation)
})
encrypted = encryption.encrypt(
{
"template": str(template.id),
"template_version": job.template_version,
"job": str(job.id),
"to": row.recipient,
"row_number": row.index,
"personalisation": dict(row.personalisation),
}
)
send_fns = {
SMS_TYPE: save_sms,
EMAIL_TYPE: save_email
}
send_fns = {SMS_TYPE: save_sms, EMAIL_TYPE: save_email}
send_fn = send_fns[template_type]
task_kwargs = {}
if sender_id:
task_kwargs['sender_id'] = sender_id
task_kwargs["sender_id"] = sender_id
notification_id = create_uuid()
send_fn.apply_async(
@@ -135,29 +144,11 @@ def process_row(row, template, job, service, sender_id=None):
encrypted,
),
task_kwargs,
queue=QueueNames.DATABASE if not service.research_mode else QueueNames.RESEARCH_MODE
queue=QueueNames.DATABASE,
)
return notification_id
def __daily_sending_limits_for_job_exceeded(service, job, job_id):
try:
total_daily_sent = check_service_over_daily_message_limit(KEY_TYPE_NORMAL, service)
if total_daily_sent + job.notification_count > service.message_limit:
raise TooManyRequestsError(service.message_limit)
else:
return False
except TooManyRequestsError:
job.job_status = 'sending limits exceeded'
job.processing_finished = datetime.utcnow()
dao_update_job(job)
current_app.logger.info(
"Job {} size {} error. Daily ending limits {} exceeded".format(
job_id, job.notification_count, service.message_limit)
)
return True
def __total_sending_limits_for_job_exceeded(service, job, job_id):
try:
total_sent = check_service_over_total_message_limit(KEY_TYPE_NORMAL, service)
@@ -177,25 +168,23 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id):
@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self,
service_id,
notification_id,
encrypted_notification,
sender_id=None):
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification['template'],
notification["template"],
service_id=service.id,
version=notification['template_version'],
version=notification["template_version"],
)
if sender_id:
reply_to_text = dao_get_service_sms_senders_by_id(service_id, sender_id).sms_sender
reply_to_text = dao_get_service_sms_senders_by_id(
service_id, sender_id
).sms_sender
else:
reply_to_text = template.reply_to_text
if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL):
if not service_allowed_to_send_to(notification["to"], service, KEY_TYPE_NORMAL):
current_app.logger.debug(
"SMS {} failed as restricted service".format(notification_id)
)
@@ -203,50 +192,50 @@ def save_sms(self,
try:
saved_notification = persist_notification(
template_id=notification['template'],
template_version=notification['template_version'],
recipient=notification['to'],
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get('personalisation'),
personalisation=notification.get("personalisation"),
notification_type=SMS_TYPE,
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
created_at=datetime.utcnow(),
job_id=notification.get('job', None),
job_row_number=notification.get('row_number', None),
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text
reply_to_text=reply_to_text,
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_SMS if not service.research_mode else QueueNames.RESEARCH_MODE
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
)
current_app.logger.debug(
"SMS {} created at {} for job {}".format(
saved_notification.id,
saved_notification.created_at,
notification.get('job', None))
notification.get("job", None),
)
)
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
@notify_celery.task(bind=True, name="save-email", max_retries=5, default_retry_delay=300)
def save_email(self,
service_id,
notification_id,
encrypted_notification,
sender_id=None):
@notify_celery.task(
bind=True, name="save-email", max_retries=5, default_retry_delay=300
)
def save_email(
self, service_id, notification_id, encrypted_notification, sender_id=None
):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(service_id)
template = SerialisedTemplate.from_id_and_service_id(
notification['template'],
notification["template"],
service_id=service.id,
version=notification['template_version'],
version=notification["template_version"],
)
if sender_id:
@@ -254,127 +243,143 @@ def save_email(self,
else:
reply_to_text = template.reply_to_text
if not service_allowed_to_send_to(notification['to'], service, KEY_TYPE_NORMAL):
current_app.logger.info("Email {} failed as restricted service".format(notification_id))
if not service_allowed_to_send_to(notification["to"], service, KEY_TYPE_NORMAL):
current_app.logger.info(
"Email {} failed as restricted service".format(notification_id)
)
return
try:
saved_notification = persist_notification(
template_id=notification['template'],
template_version=notification['template_version'],
recipient=notification['to'],
template_id=notification["template"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get('personalisation'),
personalisation=notification.get("personalisation"),
notification_type=EMAIL_TYPE,
api_key_id=None,
key_type=KEY_TYPE_NORMAL,
created_at=datetime.utcnow(),
job_id=notification.get('job', None),
job_row_number=notification.get('row_number', None),
job_id=notification.get("job", None),
job_row_number=notification.get("row_number", None),
notification_id=notification_id,
reply_to_text=reply_to_text
reply_to_text=reply_to_text,
)
provider_tasks.deliver_email.apply_async(
[str(saved_notification.id)],
queue=QueueNames.SEND_EMAIL if not service.research_mode else QueueNames.RESEARCH_MODE
[str(saved_notification.id)], queue=QueueNames.SEND_EMAIL
)
current_app.logger.debug("Email {} created at {}".format(saved_notification.id, saved_notification.created_at))
current_app.logger.debug(
"Email {} created at {}".format(
saved_notification.id, saved_notification.created_at
)
)
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
@notify_celery.task(bind=True, name="save-api-email", max_retries=5, default_retry_delay=300)
@notify_celery.task(
bind=True, name="save-api-email", max_retries=5, default_retry_delay=300
)
def save_api_email(self, encrypted_notification):
save_api_email_or_sms(self, encrypted_notification)
@notify_celery.task(bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300)
@notify_celery.task(
bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300
)
def save_api_sms(self, encrypted_notification):
save_api_email_or_sms(self, encrypted_notification)
def save_api_email_or_sms(self, encrypted_notification):
notification = encryption.decrypt(encrypted_notification)
service = SerialisedService.from_id(notification['service_id'])
q = QueueNames.SEND_EMAIL if notification['notification_type'] == EMAIL_TYPE else QueueNames.SEND_SMS
provider_task = provider_tasks.deliver_email if notification['notification_type'] == EMAIL_TYPE \
service = SerialisedService.from_id(notification["service_id"])
q = (
QueueNames.SEND_EMAIL
if notification["notification_type"] == EMAIL_TYPE
else QueueNames.SEND_SMS
)
provider_task = (
provider_tasks.deliver_email
if notification["notification_type"] == EMAIL_TYPE
else provider_tasks.deliver_sms
)
try:
persist_notification(
notification_id=notification["id"],
template_id=notification['template_id'],
template_version=notification['template_version'],
recipient=notification['to'],
template_id=notification["template_id"],
template_version=notification["template_version"],
recipient=notification["to"],
service=service,
personalisation=notification.get('personalisation'),
notification_type=notification['notification_type'],
client_reference=notification['client_reference'],
api_key_id=notification.get('api_key_id'),
personalisation=notification.get("personalisation"),
notification_type=notification["notification_type"],
client_reference=notification["client_reference"],
api_key_id=notification.get("api_key_id"),
key_type=KEY_TYPE_NORMAL,
created_at=notification['created_at'],
reply_to_text=notification['reply_to_text'],
status=notification['status'],
document_download_count=notification['document_download_count']
created_at=notification["created_at"],
reply_to_text=notification["reply_to_text"],
status=notification["status"],
document_download_count=notification["document_download_count"],
)
q = q if not service.research_mode else QueueNames.RESEARCH_MODE
provider_task.apply_async(
[notification['id']],
queue=q
)
provider_task.apply_async([notification["id"]], queue=q)
current_app.logger.debug(
f"{notification['notification_type']} {notification['id']} has been persisted and sent to delivery queue."
)
except IntegrityError:
current_app.logger.info(f"{notification['notification_type']} {notification['id']} already exists.")
current_app.logger.info(
f"{notification['notification_type']} {notification['id']} already exists."
)
except SQLAlchemyError:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.error(f"Max retry failed Failed to persist notification {notification['id']}")
current_app.logger.error(
f"Max retry failed Failed to persist notification {notification['id']}"
)
def handle_exception(task, notification, notification_id, exc):
if not get_notification_by_id(notification_id):
retry_msg = '{task} notification for job {job} row number {row} and notification id {noti}'.format(
retry_msg = "{task} notification for job {job} row number {row} and notification id {noti}".format(
task=task.__name__,
job=notification.get('job', None),
row=notification.get('row_number', None),
noti=notification_id
job=notification.get("job", None),
row=notification.get("row_number", None),
noti=notification_id,
)
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
# send to the retry queue.
# This probably (hopefully) is not an issue with Redis as the celery backing store
current_app.logger.exception('Retry' + retry_msg)
current_app.logger.exception("Retry" + retry_msg)
try:
task.retry(queue=QueueNames.RETRY, exc=exc)
except task.MaxRetriesExceededError:
current_app.logger.error('Max retry failed' + retry_msg)
current_app.logger.error("Max retry failed" + retry_msg)
@notify_celery.task(bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300)
@notify_celery.task(
bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300
)
def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
inbound_api = get_service_inbound_api_for_service(service_id=service_id)
if not inbound_api:
# No API data has been set for this service
return
inbound_sms = dao_get_inbound_sms_by_id(service_id=service_id,
inbound_id=inbound_sms_id)
inbound_sms = dao_get_inbound_sms_by_id(
service_id=service_id, inbound_id=inbound_sms_id
)
data = {
"id": str(inbound_sms.id),
# TODO: should we be validating and formatting the phone number here?
"source_number": inbound_sms.user_number,
"destination_number": inbound_sms.notify_number,
"message": inbound_sms.content,
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT)
"date_received": inbound_sms.provider_date.strftime(DATETIME_FORMAT),
}
try:
@@ -383,37 +388,37 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
url=inbound_api.url,
data=json.dumps(data),
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(inbound_api.bearer_token)
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(inbound_api.bearer_token),
},
timeout=60
timeout=60,
)
current_app.logger.debug(
f"send_inbound_sms_to_service sending {inbound_sms_id} to {inbound_api.url}, " +
f"response {response.status_code}"
f"send_inbound_sms_to_service sending {inbound_sms_id} to {inbound_api.url}, "
+ f"response {response.status_code}"
)
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
f"send_inbound_sms_to_service failed for service_id: {service_id} for inbound_sms_id: {inbound_sms_id} " +
f"and url: {inbound_api.url}. exception: {e}"
f"send_inbound_sms_to_service failed for service_id: {service_id} for inbound_sms_id: {inbound_sms_id} "
+ f"and url: {inbound_api.url}. exception: {e}"
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.error(
"Retry: send_inbound_sms_to_service has retried the max number of" +
f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
"Retry: send_inbound_sms_to_service has retried the max number of"
+ f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
)
else:
current_app.logger.warning(
f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for " +
f"inbound_sms id: {inbound_sms_id} and url: {inbound_api.url}. exception: {e}"
f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for "
+ f"inbound_sms id: {inbound_sms_id} and url: {inbound_api.url}. exception: {e}"
)
@notify_celery.task(name='process-incomplete-jobs')
@notify_celery.task(name="process-incomplete-jobs")
def process_incomplete_jobs(job_ids):
jobs = [dao_get_job_by_id(job_id) for job_id in job_ids]
@@ -438,9 +443,13 @@ def process_incomplete_job(job_id):
else:
resume_from_row = -1 # The first row in the csv with a number is row 0
current_app.logger.info("Resuming job {} from row {}".format(job_id, resume_from_row))
current_app.logger.info(
"Resuming job {} from row {}".format(job_id, resume_from_row)
)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(job)
recipient_csv, template, sender_id = get_recipient_csv_and_template_and_sender_id(
job
)
for row in recipient_csv.get_rows():
if row.index > resume_from_row:

View File

@@ -0,0 +1,190 @@
import json
from flask import current_app
from requests import HTTPError, request
from app.celery.process_ses_receipts_tasks import process_ses_results
from app.config import QueueNames
from app.dao.notifications_dao import get_notification_by_id
from app.models import SMS_TYPE
temp_fail = "2028675303"
perm_fail = "2028675302"
delivered = "2028675309"
delivered_email = "delivered@simulator.notify"
perm_fail_email = "perm-fail@simulator.notify"
temp_fail_email = "temp-fail@simulator.notify"
def send_sms_response(provider, reference):
body = sns_callback(reference)
headers = {"Content-type": "application/json"}
make_request(SMS_TYPE, provider, body, headers)
def send_email_response(reference, to):
if to == perm_fail_email:
body = ses_hard_bounce_callback(reference)
elif to == temp_fail_email:
body = ses_soft_bounce_callback(reference)
else:
body = ses_notification_callback(reference)
process_ses_results.apply_async([body], queue=QueueNames.SEND_EMAIL)
def make_request(notification_type, provider, data, headers):
api_call = "{}/notifications/{}/{}".format(
current_app.config["API_HOST_NAME"], notification_type, provider
)
try:
response = request("POST", api_call, headers=headers, data=data, timeout=60)
response.raise_for_status()
except HTTPError as e:
current_app.logger.error(
"API POST request on {} failed with status {}".format(
api_call, e.response.status_code
)
)
raise e
finally:
current_app.logger.info("Mocked provider callback request finished")
return response.json()
def sns_callback(notification_id):
notification = get_notification_by_id(notification_id)
# This will only work if all notifications, including successful ones, are in the notifications table
# If we decide to delete successful notifications, we will have to get this from notifications history
return json.dumps(
{
"CID": str(notification_id),
"status": notification.status,
# "deliverytime": notification.completed_at
}
)
def ses_notification_callback(reference):
ses_message_body = {
"delivery": {
"processingTimeMillis": 2003,
"recipients": ["success@simulator.amazonses.com"],
"remoteMtaIp": "123.123.123.123",
"reportingMTA": "a7-32.smtp-out.us-west-2.amazonses.com",
"smtpResponse": "250 2.6.0 Message received",
"timestamp": "2017-11-17T12:14:03.646Z",
},
"mail": {
"commonHeaders": {
"from": ["TEST <TEST@notify.works>"],
"subject": "lambda test",
"to": ["success@simulator.amazonses.com"],
},
"destination": ["success@simulator.amazonses.com"],
"headers": [
{"name": "From", "value": "TEST <TEST@notify.works>"},
{"name": "To", "value": "success@simulator.amazonses.com"},
{"name": "Subject", "value": "lambda test"},
{"name": "MIME-Version", "value": "1.0"},
{
"name": "Content-Type",
"value": 'multipart/alternative; boundary="----=_Part_617203_1627511946.1510920841645"',
},
],
"headersTruncated": False,
"messageId": reference,
"sendingAccountId": "12341234",
"source": '"TEST" <TEST@notify.works>',
"sourceArn": "arn:aws:ses:us-west-2:12341234:identity/notify.works",
"sourceIp": "0.0.0.1",
"timestamp": "2017-11-17T12:14:01.643Z",
},
"notificationType": "Delivery",
}
return {
"Type": "Notification",
"MessageId": "8e83c020-1234-1234-1234-92a8ee9baa0a",
"TopicArn": "arn:aws:sns:us-west-2:12341234:ses_notifications",
"Subject": None,
"Message": json.dumps(ses_message_body),
"Timestamp": "2017-11-17T12:14:03.710Z",
"SignatureVersion": "1",
"Signature": "[REDACTED]",
"SigningCertUrl": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED].pem",
"UnsubscribeUrl": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REACTED]",
"MessageAttributes": {},
}
def ses_hard_bounce_callback(reference):
return _ses_bounce_callback(reference, "Permanent")
def ses_soft_bounce_callback(reference):
return _ses_bounce_callback(reference, "Temporary")
def _ses_bounce_callback(reference, bounce_type):
ses_message_body = {
"bounce": {
"bounceSubType": "General",
"bounceType": bounce_type,
"bouncedRecipients": [
{
"action": "failed",
"diagnosticCode": "smtp; 550 5.1.1 user unknown",
"emailAddress": "bounce@simulator.amazonses.com",
"status": "5.1.1",
}
],
"feedbackId": "0102015fc9e676fb-12341234-1234-1234-1234-9301e86a4fa8-000000",
"remoteMtaIp": "123.123.123.123",
"reportingMTA": "dsn; a7-31.smtp-out.us-west-2.amazonses.com",
"timestamp": "2017-11-17T12:14:05.131Z",
},
"mail": {
"commonHeaders": {
"from": ["TEST <TEST@notify.works>"],
"subject": "ses callback test",
"to": ["bounce@simulator.amazonses.com"],
},
"destination": ["bounce@simulator.amazonses.com"],
"headers": [
{"name": "From", "value": "TEST <TEST@notify.works>"},
{"name": "To", "value": "bounce@simulator.amazonses.com"},
{"name": "Subject", "value": "lambda test"},
{"name": "MIME-Version", "value": "1.0"},
{
"name": "Content-Type",
"value": 'multipart/alternative; boundary="----=_Part_596529_2039165601.1510920843367"',
},
],
"headersTruncated": False,
"messageId": reference,
"sendingAccountId": "12341234",
"source": '"TEST" <TEST@notify.works>',
"sourceArn": "arn:aws:ses:us-west-2:12341234:identity/notify.works",
"sourceIp": "0.0.0.1",
"timestamp": "2017-11-17T12:14:03.000Z",
},
"notificationType": "Bounce",
}
return {
"Type": "Notification",
"MessageId": "36e67c28-1234-1234-1234-2ea0172aa4a7",
"TopicArn": "arn:aws:sns:us-west-2:12341234:ses_notifications",
"Subject": None,
"Message": json.dumps(ses_message_body),
"Timestamp": "2017-11-17T12:14:05.149Z",
"SignatureVersion": "1",
"Signature": "[REDACTED]", # noqa
"SigningCertUrl": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-[REDACTED]].pem",
"UnsubscribeUrl": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=[REDACTED]]",
"MessageAttributes": {},
}