From d6e5b6730eb2cd45b3ea0eadab7cb5a348c7dde6 Mon Sep 17 00:00:00 2001 From: Cliff Hill Date: Tue, 3 Dec 2024 11:26:15 -0500 Subject: [PATCH] restructured retries a lot. Signed-off-by: Cliff Hill --- app/celery/process_ses_receipts_tasks.py | 14 +-- app/celery/provider_tasks.py | 128 +++++++++++++++-------- app/celery/service_callback_tasks.py | 75 +++++++++---- app/celery/tasks.py | 113 +++++++++++++------- 4 files changed, 219 insertions(+), 111 deletions(-) diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index c03df0c98..c202d6d42 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -1,7 +1,6 @@ from datetime import timedelta import iso8601 -from celery.exceptions import Retry from flask import current_app, json from sqlalchemy.orm.exc import NoResultFound @@ -26,7 +25,11 @@ from app.utils import utc_now @notify_celery.task( - bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300 + bind=True, + name="process-ses-result", + max_retries=5, + default_retry_delay=300, + autoretry_for=Exception, ) def process_ses_results(self, response): try: @@ -65,7 +68,7 @@ def process_ses_results(self, response): f"Callback may have arrived before notification was" f"persisted to the DB. Adding task to retry queue" ) - self.retry(queue=QueueNames.RETRY) + raise else: current_app.logger.warning( f"Notification not found for reference: {reference} " @@ -110,12 +113,9 @@ def process_ses_results(self, response): return True - except Retry: - raise - except Exception: current_app.logger.exception("Error processing SES results") - self.retry(queue=QueueNames.RETRY) + raise def determine_notification_bounce_type(ses_message): diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index b0c6a4c9b..5784f4f57 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -1,6 +1,8 @@ import json +import logging import os from datetime import timedelta +from functools import wraps from botocore.exceptions import ClientError from flask import current_app @@ -31,6 +33,7 @@ DELIVERY_RECEIPT_DELAY_IN_SECONDS = 30 name="check_sms_delivery_receipt", max_retries=48, default_retry_delay=300, + autoretry_for=(NotificationTechnicalFailureException, ClientError), ) def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): """ @@ -52,7 +55,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): status, provider_response, carrier = aws_cloudwatch_client.check_sms( message_id, notification_id, sent_at ) - except NotificationTechnicalFailureException as ntfe: + except NotificationTechnicalFailureException: provider_response = "Unable to find carrier response -- still looking" status = "pending" carrier = "" @@ -62,7 +65,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): carrier=carrier, provider_response=provider_response, ) - raise self.retry(exc=ntfe) + raise except ClientError as err: # Probably a ThrottlingException but could be something else error_code = err.response["Error"]["Code"] @@ -77,7 +80,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): carrier=carrier, provider_response=provider_response, ) - raise self.retry(exc=err) + raise if status == "success": status = NotificationStatus.DELIVERED @@ -104,8 +107,36 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): ) +def _deliver_sms_task_handler(func): + """Handle the max retries exceeded error case for delivering sms notifications.""" + + @wraps(func) + def deliver_sms_task_wrapper(self, notification_id): + try: + return func(self, notification_id) + except self.MaxRetriesExceededError: + message = ( + "RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. " + "Notification has been updated to technical-failure".format( + notification_id + ) + ) + update_notification_status_by_id( + notification_id, + NotificationStatus.TECHNICAL_FAILURE, + ) + raise NotificationTechnicalFailureException(message) + + return deliver_sms_task_wrapper + + +@_deliver_sms_task_handler @notify_celery.task( - bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300 + bind=True, + name="deliver_sms", + max_retries=48, + default_retry_delay=300, + autoretry_for=Exception, ) def deliver_sms(self, notification_id): """Branch off to the final step in delivering the notification to sns and get delivery receipts.""" @@ -141,26 +172,35 @@ def deliver_sms(self, notification_id): notification_id, NotificationStatus.TEMPORARY_FAILURE, ) - if isinstance(e, SmsClientResponseException): - current_app.logger.warning( - "SMS notification delivery for id: {} failed".format(notification_id), - ) - else: - current_app.logger.exception( - "SMS notification delivery for id: {} failed".format(notification_id), - ) + if isinstance(e, SmsClientResponseException): + log_lvl = logging.WARNING + log_exc_info = False + else: + log_lvl = logging.ERROR + log_exc_info = True + + current_app.logger.log( + level=log_lvl, + msg=f"SMS notification delivery for id: {notification_id} failed", + exc_info=log_exc_info, + ) + + raise + + +def _deliver_email_task_handler(func): + """Handle the max retries exceeded error case for delivering email notifications.""" + + @wraps(func) + def deliver_email_task_wrapper(self, notification_id): try: - if self.request.retries == 0: - self.retry(queue=QueueNames.RETRY, countdown=0) - else: - self.retry(queue=QueueNames.RETRY) + return func(self, notification_id) except self.MaxRetriesExceededError: message = ( - "RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. " - "Notification has been updated to technical-failure".format( - notification_id - ) + "RETRY FAILED: Max retries reached. " + f"The task send_email_to_provider failed for notification {notification_id}. " + "Notification has been updated to technical-failure" ) update_notification_status_by_id( notification_id, @@ -168,9 +208,17 @@ def deliver_sms(self, notification_id): ) raise NotificationTechnicalFailureException(message) + return deliver_email_task_wrapper + +@_deliver_email_task_handler @notify_celery.task( - bind=True, name="deliver_email", max_retries=48, default_retry_delay=30 + bind=True, + name="deliver_email", + max_retries=48, + default_retry_delay=30, + autoretry_for=Exception, + dont_autoretry_for=EmailClientNonRetryableException, ) def deliver_email(self, notification_id): try: @@ -191,29 +239,19 @@ def deliver_email(self, notification_id): send_to_providers.send_email_to_provider(notification) except EmailClientNonRetryableException: current_app.logger.exception(f"Email notification {notification_id} failed") - update_notification_status_by_id(notification_id, "technical-failure") + update_notification_status_by_id( + notification_id, + NotificationStatus.TECHNICAL_FAILURE, + ) + raise except Exception as e: - try: - if isinstance(e, AwsSesClientThrottlingSendRateException): - current_app.logger.warning( - f"RETRY: Email notification {notification_id} was rate limited by SES" - ) - else: - current_app.logger.exception( - f"RETRY: Email notification {notification_id} failed" - ) + if isinstance(e, AwsSesClientThrottlingSendRateException): + current_app.logger.warning( + f"RETRY: Email notification {notification_id} was rate limited by SES" + ) + else: + current_app.logger.exception( + f"RETRY: Email notification {notification_id} failed" + ) - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - message = ( - "RETRY FAILED: Max retries reached. " - "The task send_email_to_provider failed for notification {}. " - "Notification has been updated to technical-failure".format( - notification_id - ) - ) - update_notification_status_by_id( - notification_id, - NotificationStatus.TECHNICAL_FAILURE, - ) - raise NotificationTechnicalFailureException(message) + raise diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index b35edc5e7..56e24665c 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -1,13 +1,57 @@ import json +from functools import wraps +from inspect import signature from flask import current_app from requests import HTTPError, RequestException, request from app import encryption, notify_celery -from app.config import QueueNames from app.utils import DATETIME_FORMAT +def _send_to_service_task_handler(func): + @wraps(func) + def send_to_service_task_wrapper(*args, **kwargs): + sig = signature(func) + bargs = sig.bind(*args, **kwargs) + bargs.apply_defaults() + + function_name = func.__name__ + + if function_name == "send_delivery_status_to_service": + encrypted_status_update = bargs.arguments["encrypted_status_update"] + + status_update = encryption.decrypt(encrypted_status_update) + service_callback_url = status_update["service_callback_api_url"] + + notification_id = bargs.arguments["notification_id"] + + elif function_name == "send_complaint_to_service": + complaint_data = bargs.arguments["complaint_data"] + + notification_id = complaint_data["notification_id"] + service_callback_url = complaint_data["service_callback_api_url"] + + else: + raise ValueError( + f"Incorrect send to service function name found: {function_name}" + ) + + self_ = bargs.arguments["self"] + + try: + return func(*args, **kwargs) + except self_.MaxRetriesExceededError: + current_app.logger.warning( + f"Retry: {function_name} has retried the max num of times for callback url " + f"{service_callback_url} and notification_id: {notification_id}" + ) + raise + + return send_to_service_task_wrapper + + +@_send_to_service_task_handler @notify_celery.task( bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300 ) @@ -36,6 +80,7 @@ def send_delivery_status_to_service(self, notification_id, encrypted_status_upda ) +@_send_to_service_task_handler @notify_celery.task( bind=True, name="send-complaint", max_retries=5, default_retry_delay=300 ) @@ -72,43 +117,29 @@ def _send_data_to_service_callback_api( data=json.dumps(data), headers={ "Content-Type": "application/json", - "Authorization": "Bearer {}".format(token), + "Authorization": f"Bearer {token}", }, timeout=5, ) current_app.logger.info( - "{} sending {} to {}, response {}".format( - function_name, - notification_id, - service_callback_url, - response.status_code, - ) + f"{function_name} sending {notification_id} to {service_callback_url}, response {response.status_code}" ) response.raise_for_status() except RequestException as e: current_app.logger.warning( - "{} request failed for notification_id: {} and url: {}. exception: {}".format( - function_name, notification_id, service_callback_url, e - ) + f"{function_name} request failed for notification_id: {notification_id} and " + f"url: {service_callback_url}. exception: {e}" ) if ( not isinstance(e, HTTPError) or e.response.status_code >= 500 or e.response.status_code == 429 ): - try: - self.retry(queue=QueueNames.CALLBACKS_RETRY) - except self.MaxRetriesExceededError: - current_app.logger.warning( - "Retry: {} has retried the max num of times for callback url {} and notification_id: {}".format( - function_name, service_callback_url, notification_id - ) - ) + raise else: current_app.logger.warning( - "{} callback is not being retried for notification_id: {} and url: {}. exception: {}".format( - function_name, notification_id, service_callback_url, e - ) + f"{function_name} callback is not being retried for notification_id: " + f"{notification_id} and url: {service_callback_url}. exception: {e}" ) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 0794ad4da..3969ac192 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,4 +1,6 @@ import json +from functools import wraps +from inspect import signature from flask import current_app from requests import HTTPError, RequestException, request @@ -10,10 +12,7 @@ from app.celery import provider_tasks from app.config import QueueNames from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job -from app.dao.notifications_dao import ( - dao_get_last_notification_added_for_job_id, - get_notification_by_id, -) +from app.dao.notifications_dao import dao_get_last_notification_added_for_job_id from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id @@ -166,6 +165,31 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id): return True +def _save_task_hander(func): + @wraps(func) + def save_task_wrapper(*args, **kwargs): + sig = signature(func) + bargs = sig.bind(*args, **kwargs) + bargs.apply_defaults() + + task = bargs.arguments["self"] + notification_id = bargs.arguments["notification_id"] + notification = encryption.decrypt(bargs.arguments["encrypted_notification"]) + + try: + return func(*args, **kwargs) + except task.MaxRetriesExceededError: + retry_msg = ( + f"{task.__name__} notification for job {notification.get("job", None)} " + ) + f"row number {notification.get("row_number", None)} and notification id {notification_id}" + current_app.logger.exception("Max retry failed" + retry_msg) + raise + + return save_task_wrapper + + +@_save_task_hander @notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300) def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None): """Persist notification to db and place notification in queue to send to sns.""" @@ -242,10 +266,16 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i ) ) - except SQLAlchemyError as e: - handle_exception(self, notification, notification_id, e) + except SQLAlchemyError: + retry_msg = ( + f"{self.__name__} notification for job {notification.get("job", None)} " + ) + f"row number {notification.get("row_number", None)} and notification id {notification_id}" + current_app.logger.exception(retry_msg) + raise +@_save_task_hander @notify_celery.task( bind=True, name="save-email", max_retries=5, default_retry_delay=300 ) @@ -298,10 +328,35 @@ def save_email( saved_notification.id, saved_notification.created_at ) ) - except SQLAlchemyError as e: - handle_exception(self, notification, notification_id, e) + except SQLAlchemyError: + retry_msg = ( + f"{self.__name__} notification for job {notification.get("job", None)} " + f"row number {notification.get("row_number", None)} and notification id {notification_id}" + ) + current_app.logger.exception(retry_msg) + raise +def _save_api_task_handler(func): + @wraps(func) + def save_api_task_wrapper(*args, **kwargs): + sig = signature(func) + bargs = sig.bind(*args, **kwargs) + bargs.apply_defaults() + + self_ = bargs.argument["self"] + notification = encryption.decrypt[bargs.arguments["encrypted_notification"]] + try: + return func(*args, **kwargs) + except self_.MaxRetriesExceededError: + current_app.logger.exception( + f"Max retry failed Failed to persist notification {notification['id']}", + ) + + return save_api_task_wrapper + + +@_save_api_task_handler @notify_celery.task( bind=True, name="save-api-email", max_retries=5, default_retry_delay=300 ) @@ -309,6 +364,7 @@ def save_api_email(self, encrypted_notification): save_api_email_or_sms(self, encrypted_notification) +@_save_api_task_handler @notify_celery.task( bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300 ) @@ -360,34 +416,23 @@ def save_api_email_or_sms(self, encrypted_notification): # up retrying because IntegrityError is a subclass of SQLAlchemyError return - except SQLAlchemyError: + +def _send_inbound_sms_to_service_handler(func): + @wraps(func) + def send_inbound_sms_to_service_wrapper(self, inbound_sms_id, service_id): try: - self.retry(queue=QueueNames.RETRY) + return func(self, inbound_sms_id, service_id) except self.MaxRetriesExceededError: current_app.logger.exception( - f"Max retry failed Failed to persist notification {notification['id']}", + "Retry: send_inbound_sms_to_service has retried the max number of" + + f"times for service: {service_id} and inbound_sms {inbound_sms_id}" ) + raise + + return send_inbound_sms_to_service_wrapper -def handle_exception(task, notification, notification_id, exc): - if not get_notification_by_id(notification_id): - retry_msg = "{task} notification for job {job} row number {row} and notification id {noti}".format( - task=task.__name__, - job=notification.get("job", None), - row=notification.get("row_number", None), - noti=notification_id, - ) - # Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems - # SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not - # send to the retry queue. - # This probably (hopefully) is not an issue with Redis as the celery backing store - current_app.logger.exception("Retry" + retry_msg) - try: - task.retry(queue=QueueNames.RETRY, exc=exc) - except task.MaxRetriesExceededError: - current_app.logger.exception("Max retry failed" + retry_msg) - - +@_send_inbound_sms_to_service_handler @notify_celery.task( bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300 ) @@ -431,13 +476,7 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): + f"and url: {inbound_api.url}. exception: {e}" ) if not isinstance(e, HTTPError) or e.response.status_code >= 500: - try: - self.retry(queue=QueueNames.RETRY) - except self.MaxRetriesExceededError: - current_app.logger.exception( - "Retry: send_inbound_sms_to_service has retried the max number of" - + f"times for service: {service_id} and inbound_sms {inbound_sms_id}" - ) + raise else: current_app.logger.warning( f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for "