mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 15:46:07 -05:00
clean up comments and method dupes
This commit is contained in:
@@ -97,13 +97,15 @@ def sns_callback_handler():
|
|||||||
|
|
||||||
|
|
||||||
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
|
@notify_celery.task(bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300)
|
||||||
@statsd(namespace="tasks")
|
|
||||||
def process_ses_results(self, response):
|
def process_ses_results(self, response):
|
||||||
try:
|
try:
|
||||||
ses_message = json.loads(response["Message"])
|
ses_message = json.loads(response["Message"])
|
||||||
notification_type = ses_message["notificationType"]
|
notification_type = ses_message["notificationType"]
|
||||||
print(f"ses_message is: {ses_message}")
|
bounce_message = None
|
||||||
if notification_type == "Complaint":
|
|
||||||
|
if notification_type == 'Bounce':
|
||||||
|
bounce_message = _determine_notification_bounce_type(ses_message)
|
||||||
|
elif notification_type == 'Complaint':
|
||||||
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
|
_check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -111,21 +113,26 @@ def process_ses_results(self, response):
|
|||||||
|
|
||||||
notification_status = aws_response_dict["notification_status"]
|
notification_status = aws_response_dict["notification_status"]
|
||||||
reference = ses_message["mail"]["messageId"]
|
reference = ses_message["mail"]["messageId"]
|
||||||
|
|
||||||
print(f"notification_status is: {notification_status}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
notification = notifications_dao.dao_get_notification_by_reference(reference)
|
notification = notifications_dao.dao_get_notification_by_reference(reference)
|
||||||
except NoResultFound:
|
except NoResultFound:
|
||||||
message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None)
|
message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace(tzinfo=None)
|
||||||
if datetime.utcnow() - message_time < timedelta(minutes=5):
|
if datetime.utcnow() - message_time < timedelta(minutes=5):
|
||||||
|
current_app.logger.info(
|
||||||
|
f"notification not found for reference: {reference} (while attempting update to {notification_status}). "
|
||||||
|
f"Callback may have arrived before notification was persisted to the DB. Adding task to retry queue"
|
||||||
|
)
|
||||||
self.retry(queue=QueueNames.RETRY)
|
self.retry(queue=QueueNames.RETRY)
|
||||||
else:
|
else:
|
||||||
current_app.logger.warning(
|
current_app.logger.warning(
|
||||||
"notification not found for reference: {} (update to {})".format(reference, notification_status)
|
"notification not found for reference: {} (while attempting update to {})".format(reference, notification_status)
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if bounce_message:
|
||||||
|
current_app.logger.info(f"SES bounce for notification ID {notification.id}: {bounce_message}")
|
||||||
|
|
||||||
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
|
if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
|
||||||
notifications_dao._duplicate_update_warning(
|
notifications_dao._duplicate_update_warning(
|
||||||
notification,
|
notification,
|
||||||
@@ -166,71 +173,3 @@ def process_ses_results(self, response):
|
|||||||
current_app.logger.exception("Error processing SES results: {}".format(type(e)))
|
current_app.logger.exception("Error processing SES results: {}".format(type(e)))
|
||||||
self.retry(queue=QueueNames.RETRY)
|
self.retry(queue=QueueNames.RETRY)
|
||||||
|
|
||||||
# def process_ses_results(self, response):
|
|
||||||
# try:
|
|
||||||
# ses_message = json.loads(response['Message'])
|
|
||||||
# print(f"ses_message is {ses_message}")
|
|
||||||
# notification_type = ses_message['notificationType']
|
|
||||||
# print(f"notification_type is {notification_type}")
|
|
||||||
# if notification_type == 'Bounce':
|
|
||||||
# notification_type = _determine_notification_bounce_type(ses_message)
|
|
||||||
# elif notification_type == 'Complaint':
|
|
||||||
# _check_and_queue_complaint_callback_task(*handle_complaint(ses_message))
|
|
||||||
# return True
|
|
||||||
# aws_response_dict = get_aws_responses(notification_type)
|
|
||||||
# print(f"aws_response_dict is {aws_response_dict}")
|
|
||||||
# notification_status = aws_response_dict['notification_status']
|
|
||||||
# print(f"notification_status is {notification_status}")
|
|
||||||
# reference = ses_message['mail']['messageId']
|
|
||||||
# try:
|
|
||||||
# notification = notifications_dao.dao_get_notification_by_reference(reference)
|
|
||||||
# print(f"notification is {notification}")
|
|
||||||
# except NoResultFound:
|
|
||||||
# print(f"notification not found")
|
|
||||||
# message_time = iso8601.parse_date(ses_message['mail']['timestamp']).replace(tzinfo=None)
|
|
||||||
# if datetime.utcnow() - message_time < timedelta(minutes=5):
|
|
||||||
# self.retry(queue=QueueNames.RETRY)
|
|
||||||
# else:
|
|
||||||
# current_app.logger.warning(
|
|
||||||
# "notification not found for reference: {} (update to {})".format(reference, notification_status)
|
|
||||||
# )
|
|
||||||
# return
|
|
||||||
# print(f"notification.status is {notification.status}")
|
|
||||||
# if notification.status not in {NOTIFICATION_SENDING, NOTIFICATION_PENDING}:
|
|
||||||
# print(f"notification.status is not in [{NOTIFICATION_SENDING}, {NOTIFICATION_PENDING}]")
|
|
||||||
# notifications_dao._duplicate_update_warning(notification, notification_status)
|
|
||||||
# return
|
|
||||||
# notifications_dao._update_notification_status(
|
|
||||||
# notification=notification,
|
|
||||||
# status=notification_status,
|
|
||||||
# provider_response=None
|
|
||||||
# )
|
|
||||||
# if not aws_response_dict['success']:
|
|
||||||
# current_app.logger.info(
|
|
||||||
# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
|
|
||||||
# notification.id, reference, aws_response_dict['message']
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
# print(
|
|
||||||
# "SES delivery failed: notification id {} and reference {} has error found. Status {}".format(
|
|
||||||
# notification.id, reference, aws_response_dict['message']
|
|
||||||
# )
|
|
||||||
# )
|
|
||||||
# else:
|
|
||||||
# current_app.logger.info('SES callback return status of {} for notification: {}'.format(
|
|
||||||
# notification_status, notification.id
|
|
||||||
# ))
|
|
||||||
# print('SES callback return status of {} for notification: {}'.format(
|
|
||||||
# notification_status, notification.id
|
|
||||||
# ))
|
|
||||||
# statsd_client.incr('callback.ses.{}'.format(notification_status))
|
|
||||||
# if notification.sent_at:
|
|
||||||
# statsd_client.timing_with_dates('callback.ses.elapsed-time', datetime.utcnow(), notification.sent_at)
|
|
||||||
# check_and_queue_callback_task(notification)
|
|
||||||
# return True
|
|
||||||
# except Retry:
|
|
||||||
# raise
|
|
||||||
# except Exception as e:
|
|
||||||
# current_app.logger.exception('Error processing SES results: {}'.format(type(e)))
|
|
||||||
# self.retry(queue=QueueNames.RETRY)
|
|
||||||
|
|
||||||
@@ -21,7 +21,7 @@ from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
|
|||||||
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
|
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
|
||||||
from app.dao.notifications_dao import (
|
from app.dao.notifications_dao import (
|
||||||
dao_get_last_notification_added_for_job_id,
|
dao_get_last_notification_added_for_job_id,
|
||||||
dao_get_notification_or_history_by_reference,
|
dao_get_notification_history_by_reference,
|
||||||
dao_update_notifications_by_reference,
|
dao_update_notifications_by_reference,
|
||||||
get_notification_by_id,
|
get_notification_by_id,
|
||||||
update_notification_status_by_reference,
|
update_notification_status_by_reference,
|
||||||
@@ -547,7 +547,7 @@ def update_letter_notification(filename, temporary_failures, update):
|
|||||||
|
|
||||||
|
|
||||||
def check_billable_units(notification_update):
|
def check_billable_units(notification_update):
|
||||||
notification = dao_get_notification_or_history_by_reference(notification_update.reference)
|
notification = dao_get_notification_history_by_reference(notification_update.reference)
|
||||||
|
|
||||||
if int(notification_update.page_count) != notification.billable_units:
|
if int(notification_update.page_count) != notification.billable_units:
|
||||||
msg = 'Notification with id {} has {} billable_units but DVLA says page count is {}'.format(
|
msg = 'Notification with id {} has {} billable_units but DVLA says page count is {}'.format(
|
||||||
|
|||||||
@@ -592,18 +592,6 @@ def dao_get_notification_by_reference(reference):
|
|||||||
).one()
|
).one()
|
||||||
|
|
||||||
|
|
||||||
def dao_get_notification_or_history_by_reference(reference):
|
|
||||||
try:
|
|
||||||
# This try except is necessary because in test keys and research mode does not create notification history.
|
|
||||||
# Otherwise we could just search for the NotificationHistory object
|
|
||||||
return Notification.query.filter(
|
|
||||||
Notification.reference == reference
|
|
||||||
).one()
|
|
||||||
except NoResultFound:
|
|
||||||
return NotificationHistory.query.filter(
|
|
||||||
NotificationHistory.reference == reference
|
|
||||||
).one()
|
|
||||||
|
|
||||||
def dao_get_notification_history_by_reference(reference):
|
def dao_get_notification_history_by_reference(reference):
|
||||||
try:
|
try:
|
||||||
# This try except is necessary because in test keys and research mode does not create notification history.
|
# This try except is necessary because in test keys and research mode does not create notification history.
|
||||||
|
|||||||
@@ -163,8 +163,8 @@ def update_notification_to_sending(notification, provider):
|
|||||||
notification.sent_at = datetime.utcnow()
|
notification.sent_at = datetime.utcnow()
|
||||||
notification.sent_by = provider.name
|
notification.sent_by = provider.name
|
||||||
if notification.status not in NOTIFICATION_STATUS_TYPES_COMPLETED:
|
if notification.status not in NOTIFICATION_STATUS_TYPES_COMPLETED:
|
||||||
# We currently have no callback method for SNS
|
# We currently have no callback method for SMS deliveries
|
||||||
# TODO create celery task to request delivery receipts from cloudwatch api
|
# TODO create celery task to request SMS delivery receipts from cloudwatch api
|
||||||
notification.status = NOTIFICATION_SENT if notification.notification_type == "sms" else NOTIFICATION_SENDING
|
notification.status = NOTIFICATION_SENT if notification.notification_type == "sms" else NOTIFICATION_SENDING
|
||||||
|
|
||||||
dao_update_notification(notification)
|
dao_update_notification(notification)
|
||||||
|
|||||||
@@ -112,8 +112,8 @@ def remove_mail_headers(dict_to_edit):
|
|||||||
|
|
||||||
def remove_emails_from_bounce(bounce_dict):
|
def remove_emails_from_bounce(bounce_dict):
|
||||||
remove_mail_headers(bounce_dict)
|
remove_mail_headers(bounce_dict)
|
||||||
bounce_dict["mail"].pop("destination")
|
bounce_dict["mail"].pop("destination", None)
|
||||||
bounce_dict["bounce"].pop("bouncedRecipients")
|
bounce_dict["bounce"].pop("bouncedRecipients", None)
|
||||||
|
|
||||||
|
|
||||||
def remove_emails_from_complaint(complaint_dict):
|
def remove_emails_from_complaint(complaint_dict):
|
||||||
@@ -121,6 +121,7 @@ def remove_emails_from_complaint(complaint_dict):
|
|||||||
complaint_dict["complaint"].pop("complainedRecipients")
|
complaint_dict["complaint"].pop("complainedRecipients")
|
||||||
return complaint_dict["mail"].pop("destination")
|
return complaint_dict["mail"].pop("destination")
|
||||||
|
|
||||||
|
|
||||||
def check_and_queue_callback_task(notification):
|
def check_and_queue_callback_task(notification):
|
||||||
# queue callback task only if the service_callback_api exists
|
# queue callback task only if the service_callback_api exists
|
||||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
||||||
@@ -128,17 +129,11 @@ def check_and_queue_callback_task(notification):
|
|||||||
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
|
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
|
||||||
send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS)
|
send_delivery_status_to_service.apply_async([str(notification.id), notification_data], queue=QueueNames.CALLBACKS)
|
||||||
|
|
||||||
def check_and_queue_callback_task(notification):
|
|
||||||
# queue callback task only if the service_callback_api exists
|
|
||||||
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
|
|
||||||
if service_callback_api:
|
|
||||||
notification_data = create_delivery_status_callback_data(notification, service_callback_api)
|
|
||||||
send_delivery_status_to_service.apply_async([str(notification.id), notification_data],
|
|
||||||
queue=QueueNames.CALLBACKS)
|
|
||||||
|
|
||||||
def _check_and_queue_complaint_callback_task(complaint, notification, recipient):
|
def _check_and_queue_complaint_callback_task(complaint, notification, recipient):
|
||||||
# queue callback task only if the service_callback_api exists
|
# queue callback task only if the service_callback_api exists
|
||||||
service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id)
|
service_callback_api = get_service_complaint_callback_api_for_service(service_id=notification.service_id)
|
||||||
if service_callback_api:
|
if service_callback_api:
|
||||||
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
|
complaint_data = create_complaint_callback_data(complaint, notification, service_callback_api, recipient)
|
||||||
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
|
send_complaint_to_service.apply_async([complaint_data], queue=QueueNames.CALLBACKS)
|
||||||
|
|
||||||
@@ -191,7 +191,7 @@ def test_ses_callback_should_log_if_notification_is_missing(client, _notify_db,
|
|||||||
with freeze_time('2017-11-17T12:34:03.646Z'):
|
with freeze_time('2017-11-17T12:34:03.646Z'):
|
||||||
assert process_ses_results(ses_notification_callback(reference='ref')) is None
|
assert process_ses_results(ses_notification_callback(reference='ref')) is None
|
||||||
assert mock_retry.call_count == 0
|
assert mock_retry.call_count == 0
|
||||||
mock_logger.assert_called_once_with('notification not found for reference: ref (update to delivered)')
|
mock_logger.assert_called_once_with('notification not found for reference: ref (while attempting update to delivered)')
|
||||||
def test_ses_callback_should_not_retry_if_notification_is_old(client, _notify_db, mocker):
|
def test_ses_callback_should_not_retry_if_notification_is_old(client, _notify_db, mocker):
|
||||||
mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry')
|
mock_retry = mocker.patch('app.celery.process_ses_receipts_tasks.process_ses_results.retry')
|
||||||
mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error')
|
mock_logger = mocker.patch('app.celery.process_ses_receipts_tasks.current_app.logger.error')
|
||||||
|
|||||||
@@ -146,16 +146,6 @@ def create_sample_notification(
|
|||||||
data["job_row_number"] = job_row_number
|
data["job_row_number"] = job_row_number
|
||||||
notification = Notification(**data)
|
notification = Notification(**data)
|
||||||
dao_create_notification(notification)
|
dao_create_notification(notification)
|
||||||
# if scheduled_for:
|
|
||||||
# scheduled_notification = ScheduledNotification(
|
|
||||||
# id=uuid.uuid4(),
|
|
||||||
# notification_id=notification.id,
|
|
||||||
# scheduled_for=datetime.strptime(scheduled_for, "%Y-%m-%d %H:%M"),
|
|
||||||
# )
|
|
||||||
# if status != "created":
|
|
||||||
# scheduled_notification.pending = False
|
|
||||||
# db.session.add(scheduled_notification)
|
|
||||||
# db.session.commit()
|
|
||||||
|
|
||||||
return notification
|
return notification
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ from app.dao.notifications_dao import (
|
|||||||
dao_get_letters_to_be_printed,
|
dao_get_letters_to_be_printed,
|
||||||
dao_get_notification_by_reference,
|
dao_get_notification_by_reference,
|
||||||
dao_get_notification_count_for_job_id,
|
dao_get_notification_count_for_job_id,
|
||||||
dao_get_notification_or_history_by_reference,
|
dao_get_notification_history_by_reference,
|
||||||
dao_get_notifications_by_recipient_or_reference,
|
dao_get_notifications_by_recipient_or_reference,
|
||||||
dao_timeout_notifications,
|
dao_timeout_notifications,
|
||||||
dao_update_notification,
|
dao_update_notification,
|
||||||
@@ -1607,28 +1607,28 @@ def test_dao_get_notification_by_reference_with_no_matches_raises_error(notify_d
|
|||||||
dao_get_notification_by_reference('REF1')
|
dao_get_notification_by_reference('REF1')
|
||||||
|
|
||||||
|
|
||||||
def test_dao_get_notification_or_history_by_reference_with_one_match_returns_notification(
|
def test_dao_get_notification_history_by_reference_with_one_match_returns_notification(
|
||||||
sample_letter_template
|
sample_letter_template
|
||||||
):
|
):
|
||||||
create_notification(template=sample_letter_template, reference='REF1')
|
create_notification(template=sample_letter_template, reference='REF1')
|
||||||
notification = dao_get_notification_or_history_by_reference('REF1')
|
notification = dao_get_notification_history_by_reference('REF1')
|
||||||
|
|
||||||
assert notification.reference == 'REF1'
|
assert notification.reference == 'REF1'
|
||||||
|
|
||||||
|
|
||||||
def test_dao_get_notification_or_history_by_reference_with_multiple_matches_raises_error(
|
def test_dao_get_notification_history_by_reference_with_multiple_matches_raises_error(
|
||||||
sample_letter_template
|
sample_letter_template
|
||||||
):
|
):
|
||||||
create_notification(template=sample_letter_template, reference='REF1')
|
create_notification(template=sample_letter_template, reference='REF1')
|
||||||
create_notification(template=sample_letter_template, reference='REF1')
|
create_notification(template=sample_letter_template, reference='REF1')
|
||||||
|
|
||||||
with pytest.raises(SQLAlchemyError):
|
with pytest.raises(SQLAlchemyError):
|
||||||
dao_get_notification_or_history_by_reference('REF1')
|
dao_get_notification_history_by_reference('REF1')
|
||||||
|
|
||||||
|
|
||||||
def test_dao_get_notification_or_history_by_reference_with_no_matches_raises_error(notify_db_session):
|
def test_dao_get_notification_history_by_reference_with_no_matches_raises_error(notify_db_session):
|
||||||
with pytest.raises(SQLAlchemyError):
|
with pytest.raises(SQLAlchemyError):
|
||||||
dao_get_notification_or_history_by_reference('REF1')
|
dao_get_notification_history_by_reference('REF1')
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("notification_type",
|
@pytest.mark.parametrize("notification_type",
|
||||||
|
|||||||
Reference in New Issue
Block a user