diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index b14d50ee5..25e4d5034 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -35,7 +35,7 @@ from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.enums import JobStatus, NotificationType from app.models import Job from app.notifications.process_notifications import send_notification_to_queue -from app.utils import hilite, utc_now +from app.utils import utc_now from notifications_utils import aware_utcnow from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket @@ -238,23 +238,43 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(): zendesk_client.send_ticket_to_zendesk(ticket) -@notify_celery.task(name="process-delivery-receipts") -def process_delivery_receipts(): - print(hilite("ENTER PROCESS DELIVERY RECEIPTS")) - cloudwatch = AwsCloudwatchClient() - cloudwatch.init_app(current_app) - start_time = aware_utcnow() - timedelta(minutes=10) - end_time = aware_utcnow() - delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts( - start_time, end_time - ) - delivered_receipts = list(delivered_receipts) - batch_size = 100 - for i in range(0, len(delivered_receipts), batch_size): - batch = delivered_receipts[i : i + batch_size] - dao_update_delivery_receipts(batch, True) - failed_receipts = list(failed_receipts) - batch_size = 100 - for i in range(0, len(failed_receipts), batch_size): - batch = failed_receipts[i : i + batch_size] - dao_update_delivery_receipts(batch, False) +@notify_celery.task( + bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts" +) +def process_delivery_receipts(self): + """ + Every eight minutes or so (see config.py) we run this task, which searches the last ten + minutes of logs for delivery receipts and batch updates the db with the results. The overlap + is intentional. We don't mind re-updating things, it is better than losing data. + + We also set this to retry with exponential backoff in the case of failure. The only way this would + fail is if, for example the db went down, or redis filled causing the app to stop processing. But if + it does fail, we need to go back over at some point when things are running again and process those results. + """ + try: + batch_size = 200 # in theory with postgresql this could be 10k to 20k? + + cloudwatch = AwsCloudwatchClient() + cloudwatch.init_app(current_app) + start_time = aware_utcnow() - timedelta(minutes=10) + end_time = aware_utcnow() + delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts( + start_time, end_time + ) + delivered_receipts = list(delivered_receipts) + for i in range(0, len(delivered_receipts), batch_size): + batch = delivered_receipts[i : i + batch_size] + dao_update_delivery_receipts(batch, True) + failed_receipts = list(failed_receipts) + for i in range(0, len(failed_receipts), batch_size): + batch = failed_receipts[i : i + batch_size] + dao_update_delivery_receipts(batch, False) + except Exception as ex: + retry_count = self.request.retries + wait_time = 3600 * 2**retry_count + try: + raise self.retry(ex=ex, countdown=wait_time) + except self.MaxRetriesExceededError: + current_app.logger.error( + "Failed process delivery receipts after max retries" + ) diff --git a/app/clients/cloudwatch/aws_cloudwatch.py b/app/clients/cloudwatch/aws_cloudwatch.py index d84044943..297052741 100644 --- a/app/clients/cloudwatch/aws_cloudwatch.py +++ b/app/clients/cloudwatch/aws_cloudwatch.py @@ -3,6 +3,7 @@ import os import re from boto3 import client +from flask import current_app from app.clients import AWS_CLIENT_CONFIG, Client from app.cloudfoundry_config import cloud_config @@ -136,13 +137,13 @@ class AwsCloudwatchClient(Client): # and run this on a schedule. def check_delivery_receipts(self, start, end): region = cloud_config.sns_region - # TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258 account_number = self._extract_account_number(cloud_config.ses_domain_arn) delivered_event_set = set() log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber" - print(hilite(f"LOG GROUP NAME {log_group_name}")) all_delivered_events = self._get_log(log_group_name, start, end) - print(f"ALL DELIVEREDS {len(all_delivered_events)}") + current_app.logger.info( + f"Delivered count {len(all_delivered_events)} over range {start} to {end}" + ) for event in all_delivered_events: actual_event = self.event_to_db_format(event["message"]) @@ -153,7 +154,9 @@ class AwsCloudwatchClient(Client): f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure" ) all_failed_events = self._get_log(log_group_name, start, end) - print(f"ALL FAILEDS {len(all_failed_events)}") + current_app.logger.info( + f"Failed count {len(all_delivered_events)} over range {start} to {end}" + ) for event in all_failed_events: actual_event = self.event_to_db_format(event["message"]) failed_event_set.add(json.dumps(actual_event)) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index ef338dc4d..6ce97b801 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -730,16 +730,6 @@ def dao_update_delivery_receipts(receipts, delivered): new_receipts.append(r) receipts = new_receipts - print(receipts) - - statuses = {} - for r in receipts: - if r["status"].lower() == "success": - statuses[r["notification.messageId"]] = NotificationStatus.DELIVERED - else: - statuses[r["notification.messageId"]] = NotificationStatus.FAILED - - print(f"HERE ARE STATUSES {statuses}") id_to_carrier = { r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts @@ -777,6 +767,5 @@ def dao_update_delivery_receipts(receipts, delivered): ), ) ) - print(stmt) db.session.execute(stmt) db.session.commit()