From 8edc8b04a719fe13fead201c2fa1b263111210ce Mon Sep 17 00:00:00 2001 From: Cliff Hill Date: Tue, 3 Dec 2024 11:24:09 -0500 Subject: [PATCH] Trying to get autoretry logic to work. Signed-off-by: Cliff Hill --- app/celery/process_ses_receipts_tasks.py | 12 +++++++++-- app/celery/provider_tasks.py | 16 ++++++++++---- app/celery/service_callback_tasks.py | 9 ++++++-- .../celery/test_process_ses_receipts_tasks.py | 21 ++++++++++++++++--- 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index c3d24504c..3fb6983f6 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -27,9 +27,10 @@ from app.utils import utc_now @notify_celery.task( bind=True, name="process-ses-result", + throws=(Exception,), + autoretry_for=(Exception,), max_retries=5, default_retry_delay=300, - autoretry_for=(Exception,), ) def process_ses_results(self, response): try: @@ -58,9 +59,11 @@ def process_ses_results(self, response): reference ) except NoResultFound: + print("&"*80) message_time = iso8601.parse_date(ses_message["mail"]["timestamp"]).replace( tzinfo=None ) + print("&"*80) if utc_now() - message_time < timedelta(minutes=5): current_app.logger.info( f"Notification not found for reference: {reference}" @@ -68,6 +71,7 @@ def process_ses_results(self, response): f"Callback may have arrived before notification was" f"persisted to the DB. Adding task to retry queue" ) + print("&"*80) raise else: current_app.logger.warning( @@ -113,7 +117,11 @@ def process_ses_results(self, response): return True - except Exception: + except Exception as e: + print("^"*80) + print(type(e)) + print(e) + print("^"*80) current_app.logger.exception("Error processing SES results") raise diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index 53021d580..07d2beec1 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -107,9 +107,11 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at): ) -def _deliver_sms_task_handler(func): +def _deliver_sms_task_handler(cls): """Handle the max retries exceeded error case for delivering sms notifications.""" + func = cls.__call__ + @wraps(func) def deliver_sms_task_wrapper(self, notification_id): try: @@ -127,7 +129,9 @@ def _deliver_sms_task_handler(func): ) raise NotificationTechnicalFailureException(message) - return deliver_sms_task_wrapper + cls.__call__ = deliver_sms_task_wrapper + + return cls @_deliver_sms_task_handler @@ -189,9 +193,11 @@ def deliver_sms(self, notification_id): raise -def _deliver_email_task_handler(func): +def _deliver_email_task_handler(cls): """Handle the max retries exceeded error case for delivering email notifications.""" + func = cls.__call__ + @wraps(func) def deliver_email_task_wrapper(self, notification_id): try: @@ -208,7 +214,9 @@ def _deliver_email_task_handler(func): ) raise NotificationTechnicalFailureException(message) - return deliver_email_task_wrapper + cls.__call__ = deliver_email_task_wrapper + + return cls @_deliver_email_task_handler diff --git a/app/celery/service_callback_tasks.py b/app/celery/service_callback_tasks.py index 66fc56f8c..b7946e7dd 100644 --- a/app/celery/service_callback_tasks.py +++ b/app/celery/service_callback_tasks.py @@ -9,7 +9,10 @@ from app import encryption, notify_celery from app.utils import DATETIME_FORMAT -def _send_to_service_task_handler(func): +def _send_to_service_task_handler(cls): + + func = cls.__call__ + @wraps(func) def send_to_service_task_wrapper(*args, **kwargs): sig = signature(func) @@ -48,7 +51,9 @@ def _send_to_service_task_handler(func): ) raise - return send_to_service_task_wrapper + cls.__call__ = send_to_service_task_wrapper + + return cls @_send_to_service_task_handler diff --git a/tests/app/celery/test_process_ses_receipts_tasks.py b/tests/app/celery/test_process_ses_receipts_tasks.py index 226394eeb..82aca2771 100644 --- a/tests/app/celery/test_process_ses_receipts_tasks.py +++ b/tests/app/celery/test_process_ses_receipts_tasks.py @@ -1,6 +1,7 @@ import json from unittest.mock import ANY +import pytest from freezegun import freeze_time from app import encryption @@ -157,7 +158,10 @@ def test_process_ses_results_retry_called(sample_email_template, mocker): mocked = mocker.patch( "app.celery.process_ses_receipts_tasks.process_ses_results.retry" ) - process_ses_results(response=ses_notification_callback(reference="ref1")) + with pytest.raises(Exception): # noqa: B017 + # In order to make this work, we have to suppress the flake8 warning about + # pytest.raises(Exception), which is usually considered a bad thing. + process_ses_results(response=ses_notification_callback(reference="ref1")) assert mocked.call_count != 0 @@ -240,7 +244,7 @@ def test_ses_callback_should_not_update_notification_status_if_already_delivered assert mock_upd.call_count == 0 -def test_ses_callback_should_retry_if_notification_is_new(mocker): +def test_ses_callback_should_retry_if_notification_is_new(client, _notify_db, mocker): mock_retry = mocker.patch( "app.celery.process_ses_receipts_tasks.process_ses_results.retry" ) @@ -248,7 +252,18 @@ def test_ses_callback_should_retry_if_notification_is_new(mocker): "app.celery.process_ses_receipts_tasks.current_app.logger.error" ) with freeze_time("2017-11-17T12:14:03.646Z"): - assert process_ses_results(ses_notification_callback(reference="ref")) is None + try: + assert ( + process_ses_results(ses_notification_callback(reference="ref")) is None + ) + except Exception as e: + import traceback + print(type(e)) + print("*" * 80) + print(e) + print("-" * 80) + print(traceback.format_exc()) + print("-" * 80) assert mock_logger.call_count == 0 assert mock_retry.call_count == 1