restructured retries a lot.

Signed-off-by: Cliff Hill <Clifford.hill@gsa.gov>
This commit is contained in:
Cliff Hill
2024-12-03 11:26:15 -05:00
parent fe033b0d7b
commit d6e5b6730e
4 changed files with 219 additions and 111 deletions

View File

@@ -1,7 +1,6 @@
from datetime import timedelta
import iso8601
from celery.exceptions import Retry
from flask import current_app, json
from sqlalchemy.orm.exc import NoResultFound
@@ -26,7 +25,11 @@ from app.utils import utc_now
@notify_celery.task(
bind=True, name="process-ses-result", max_retries=5, default_retry_delay=300
bind=True,
name="process-ses-result",
max_retries=5,
default_retry_delay=300,
autoretry_for=Exception,
)
def process_ses_results(self, response):
try:
@@ -65,7 +68,7 @@ def process_ses_results(self, response):
f"Callback may have arrived before notification was"
f"persisted to the DB. Adding task to retry queue"
)
self.retry(queue=QueueNames.RETRY)
raise
else:
current_app.logger.warning(
f"Notification not found for reference: {reference} "
@@ -110,12 +113,9 @@ def process_ses_results(self, response):
return True
except Retry:
raise
except Exception:
current_app.logger.exception("Error processing SES results")
self.retry(queue=QueueNames.RETRY)
raise
def determine_notification_bounce_type(ses_message):

View File

@@ -1,6 +1,8 @@
import json
import logging
import os
from datetime import timedelta
from functools import wraps
from botocore.exceptions import ClientError
from flask import current_app
@@ -31,6 +33,7 @@ DELIVERY_RECEIPT_DELAY_IN_SECONDS = 30
name="check_sms_delivery_receipt",
max_retries=48,
default_retry_delay=300,
autoretry_for=(NotificationTechnicalFailureException, ClientError),
)
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
@@ -52,7 +55,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
status, provider_response, carrier = aws_cloudwatch_client.check_sms(
message_id, notification_id, sent_at
)
except NotificationTechnicalFailureException as ntfe:
except NotificationTechnicalFailureException:
provider_response = "Unable to find carrier response -- still looking"
status = "pending"
carrier = ""
@@ -62,7 +65,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=ntfe)
raise
except ClientError as err:
# Probably a ThrottlingException but could be something else
error_code = err.response["Error"]["Code"]
@@ -77,7 +80,7 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
carrier=carrier,
provider_response=provider_response,
)
raise self.retry(exc=err)
raise
if status == "success":
status = NotificationStatus.DELIVERED
@@ -104,8 +107,36 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
)
def _deliver_sms_task_handler(func):
"""Handle the max retries exceeded error case for delivering sms notifications."""
@wraps(func)
def deliver_sms_task_wrapper(self, notification_id):
try:
return func(self, notification_id)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
)
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise NotificationTechnicalFailureException(message)
return deliver_sms_task_wrapper
@_deliver_sms_task_handler
@notify_celery.task(
bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300
bind=True,
name="deliver_sms",
max_retries=48,
default_retry_delay=300,
autoretry_for=Exception,
)
def deliver_sms(self, notification_id):
"""Branch off to the final step in delivering the notification to sns and get delivery receipts."""
@@ -141,26 +172,35 @@ def deliver_sms(self, notification_id):
notification_id,
NotificationStatus.TEMPORARY_FAILURE,
)
if isinstance(e, SmsClientResponseException):
current_app.logger.warning(
"SMS notification delivery for id: {} failed".format(notification_id),
)
else:
current_app.logger.exception(
"SMS notification delivery for id: {} failed".format(notification_id),
)
if isinstance(e, SmsClientResponseException):
log_lvl = logging.WARNING
log_exc_info = False
else:
log_lvl = logging.ERROR
log_exc_info = True
current_app.logger.log(
level=log_lvl,
msg=f"SMS notification delivery for id: {notification_id} failed",
exc_info=log_exc_info,
)
raise
def _deliver_email_task_handler(func):
"""Handle the max retries exceeded error case for delivering email notifications."""
@wraps(func)
def deliver_email_task_wrapper(self, notification_id):
try:
if self.request.retries == 0:
self.retry(queue=QueueNames.RETRY, countdown=0)
else:
self.retry(queue=QueueNames.RETRY)
return func(self, notification_id)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
"RETRY FAILED: Max retries reached. "
f"The task send_email_to_provider failed for notification {notification_id}. "
"Notification has been updated to technical-failure"
)
update_notification_status_by_id(
notification_id,
@@ -168,9 +208,17 @@ def deliver_sms(self, notification_id):
)
raise NotificationTechnicalFailureException(message)
return deliver_email_task_wrapper
@_deliver_email_task_handler
@notify_celery.task(
bind=True, name="deliver_email", max_retries=48, default_retry_delay=30
bind=True,
name="deliver_email",
max_retries=48,
default_retry_delay=30,
autoretry_for=Exception,
dont_autoretry_for=EmailClientNonRetryableException,
)
def deliver_email(self, notification_id):
try:
@@ -191,29 +239,19 @@ def deliver_email(self, notification_id):
send_to_providers.send_email_to_provider(notification)
except EmailClientNonRetryableException:
current_app.logger.exception(f"Email notification {notification_id} failed")
update_notification_status_by_id(notification_id, "technical-failure")
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise
except Exception as e:
try:
if isinstance(e, AwsSesClientThrottlingSendRateException):
current_app.logger.warning(
f"RETRY: Email notification {notification_id} was rate limited by SES"
)
else:
current_app.logger.exception(
f"RETRY: Email notification {notification_id} failed"
)
if isinstance(e, AwsSesClientThrottlingSendRateException):
current_app.logger.warning(
f"RETRY: Email notification {notification_id} was rate limited by SES"
)
else:
current_app.logger.exception(
f"RETRY: Email notification {notification_id} failed"
)
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
message = (
"RETRY FAILED: Max retries reached. "
"The task send_email_to_provider failed for notification {}. "
"Notification has been updated to technical-failure".format(
notification_id
)
)
update_notification_status_by_id(
notification_id,
NotificationStatus.TECHNICAL_FAILURE,
)
raise NotificationTechnicalFailureException(message)
raise

View File

@@ -1,13 +1,57 @@
import json
from functools import wraps
from inspect import signature
from flask import current_app
from requests import HTTPError, RequestException, request
from app import encryption, notify_celery
from app.config import QueueNames
from app.utils import DATETIME_FORMAT
def _send_to_service_task_handler(func):
@wraps(func)
def send_to_service_task_wrapper(*args, **kwargs):
sig = signature(func)
bargs = sig.bind(*args, **kwargs)
bargs.apply_defaults()
function_name = func.__name__
if function_name == "send_delivery_status_to_service":
encrypted_status_update = bargs.arguments["encrypted_status_update"]
status_update = encryption.decrypt(encrypted_status_update)
service_callback_url = status_update["service_callback_api_url"]
notification_id = bargs.arguments["notification_id"]
elif function_name == "send_complaint_to_service":
complaint_data = bargs.arguments["complaint_data"]
notification_id = complaint_data["notification_id"]
service_callback_url = complaint_data["service_callback_api_url"]
else:
raise ValueError(
f"Incorrect send to service function name found: {function_name}"
)
self_ = bargs.arguments["self"]
try:
return func(*args, **kwargs)
except self_.MaxRetriesExceededError:
current_app.logger.warning(
f"Retry: {function_name} has retried the max num of times for callback url "
f"{service_callback_url} and notification_id: {notification_id}"
)
raise
return send_to_service_task_wrapper
@_send_to_service_task_handler
@notify_celery.task(
bind=True, name="send-delivery-status", max_retries=5, default_retry_delay=300
)
@@ -36,6 +80,7 @@ def send_delivery_status_to_service(self, notification_id, encrypted_status_upda
)
@_send_to_service_task_handler
@notify_celery.task(
bind=True, name="send-complaint", max_retries=5, default_retry_delay=300
)
@@ -72,43 +117,29 @@ def _send_data_to_service_callback_api(
data=json.dumps(data),
headers={
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(token),
"Authorization": f"Bearer {token}",
},
timeout=5,
)
current_app.logger.info(
"{} sending {} to {}, response {}".format(
function_name,
notification_id,
service_callback_url,
response.status_code,
)
f"{function_name} sending {notification_id} to {service_callback_url}, response {response.status_code}"
)
response.raise_for_status()
except RequestException as e:
current_app.logger.warning(
"{} request failed for notification_id: {} and url: {}. exception: {}".format(
function_name, notification_id, service_callback_url, e
)
f"{function_name} request failed for notification_id: {notification_id} and "
f"url: {service_callback_url}. exception: {e}"
)
if (
not isinstance(e, HTTPError)
or e.response.status_code >= 500
or e.response.status_code == 429
):
try:
self.retry(queue=QueueNames.CALLBACKS_RETRY)
except self.MaxRetriesExceededError:
current_app.logger.warning(
"Retry: {} has retried the max num of times for callback url {} and notification_id: {}".format(
function_name, service_callback_url, notification_id
)
)
raise
else:
current_app.logger.warning(
"{} callback is not being retried for notification_id: {} and url: {}. exception: {}".format(
function_name, notification_id, service_callback_url, e
)
f"{function_name} callback is not being retried for notification_id: "
f"{notification_id} and url: {service_callback_url}. exception: {e}"
)

View File

@@ -1,4 +1,6 @@
import json
from functools import wraps
from inspect import signature
from flask import current_app
from requests import HTTPError, RequestException, request
@@ -10,10 +12,7 @@ from app.celery import provider_tasks
from app.config import QueueNames
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
from app.dao.notifications_dao import (
dao_get_last_notification_added_for_job_id,
get_notification_by_id,
)
from app.dao.notifications_dao import dao_get_last_notification_added_for_job_id
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
@@ -166,6 +165,31 @@ def __total_sending_limits_for_job_exceeded(service, job, job_id):
return True
def _save_task_hander(func):
@wraps(func)
def save_task_wrapper(*args, **kwargs):
sig = signature(func)
bargs = sig.bind(*args, **kwargs)
bargs.apply_defaults()
task = bargs.arguments["self"]
notification_id = bargs.arguments["notification_id"]
notification = encryption.decrypt(bargs.arguments["encrypted_notification"])
try:
return func(*args, **kwargs)
except task.MaxRetriesExceededError:
retry_msg = (
f"{task.__name__} notification for job {notification.get("job", None)} "
)
f"row number {notification.get("row_number", None)} and notification id {notification_id}"
current_app.logger.exception("Max retry failed" + retry_msg)
raise
return save_task_wrapper
@_save_task_hander
@notify_celery.task(bind=True, name="save-sms", max_retries=5, default_retry_delay=300)
def save_sms(self, service_id, notification_id, encrypted_notification, sender_id=None):
"""Persist notification to db and place notification in queue to send to sns."""
@@ -242,10 +266,16 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
except SQLAlchemyError:
retry_msg = (
f"{self.__name__} notification for job {notification.get("job", None)} "
)
f"row number {notification.get("row_number", None)} and notification id {notification_id}"
current_app.logger.exception(retry_msg)
raise
@_save_task_hander
@notify_celery.task(
bind=True, name="save-email", max_retries=5, default_retry_delay=300
)
@@ -298,10 +328,35 @@ def save_email(
saved_notification.id, saved_notification.created_at
)
)
except SQLAlchemyError as e:
handle_exception(self, notification, notification_id, e)
except SQLAlchemyError:
retry_msg = (
f"{self.__name__} notification for job {notification.get("job", None)} "
f"row number {notification.get("row_number", None)} and notification id {notification_id}"
)
current_app.logger.exception(retry_msg)
raise
def _save_api_task_handler(func):
@wraps(func)
def save_api_task_wrapper(*args, **kwargs):
sig = signature(func)
bargs = sig.bind(*args, **kwargs)
bargs.apply_defaults()
self_ = bargs.argument["self"]
notification = encryption.decrypt[bargs.arguments["encrypted_notification"]]
try:
return func(*args, **kwargs)
except self_.MaxRetriesExceededError:
current_app.logger.exception(
f"Max retry failed Failed to persist notification {notification['id']}",
)
return save_api_task_wrapper
@_save_api_task_handler
@notify_celery.task(
bind=True, name="save-api-email", max_retries=5, default_retry_delay=300
)
@@ -309,6 +364,7 @@ def save_api_email(self, encrypted_notification):
save_api_email_or_sms(self, encrypted_notification)
@_save_api_task_handler
@notify_celery.task(
bind=True, name="save-api-sms", max_retries=5, default_retry_delay=300
)
@@ -360,34 +416,23 @@ def save_api_email_or_sms(self, encrypted_notification):
# up retrying because IntegrityError is a subclass of SQLAlchemyError
return
except SQLAlchemyError:
def _send_inbound_sms_to_service_handler(func):
@wraps(func)
def send_inbound_sms_to_service_wrapper(self, inbound_sms_id, service_id):
try:
self.retry(queue=QueueNames.RETRY)
return func(self, inbound_sms_id, service_id)
except self.MaxRetriesExceededError:
current_app.logger.exception(
f"Max retry failed Failed to persist notification {notification['id']}",
"Retry: send_inbound_sms_to_service has retried the max number of"
+ f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
)
raise
return send_inbound_sms_to_service_wrapper
def handle_exception(task, notification, notification_id, exc):
if not get_notification_by_id(notification_id):
retry_msg = "{task} notification for job {job} row number {row} and notification id {noti}".format(
task=task.__name__,
job=notification.get("job", None),
row=notification.get("row_number", None),
noti=notification_id,
)
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
# send to the retry queue.
# This probably (hopefully) is not an issue with Redis as the celery backing store
current_app.logger.exception("Retry" + retry_msg)
try:
task.retry(queue=QueueNames.RETRY, exc=exc)
except task.MaxRetriesExceededError:
current_app.logger.exception("Max retry failed" + retry_msg)
@_send_inbound_sms_to_service_handler
@notify_celery.task(
bind=True, name="send-inbound-sms", max_retries=5, default_retry_delay=300
)
@@ -431,13 +476,7 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
+ f"and url: {inbound_api.url}. exception: {e}"
)
if not isinstance(e, HTTPError) or e.response.status_code >= 500:
try:
self.retry(queue=QueueNames.RETRY)
except self.MaxRetriesExceededError:
current_app.logger.exception(
"Retry: send_inbound_sms_to_service has retried the max number of"
+ f"times for service: {service_id} and inbound_sms {inbound_sms_id}"
)
raise
else:
current_app.logger.warning(
f"send_inbound_sms_to_service is not being retried for service_id: {service_id} for "