mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 17:31:14 -05:00
clean up
This commit is contained in:
@@ -35,7 +35,7 @@ from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
|||||||
from app.enums import JobStatus, NotificationType
|
from app.enums import JobStatus, NotificationType
|
||||||
from app.models import Job
|
from app.models import Job
|
||||||
from app.notifications.process_notifications import send_notification_to_queue
|
from app.notifications.process_notifications import send_notification_to_queue
|
||||||
from app.utils import hilite, utc_now
|
from app.utils import utc_now
|
||||||
from notifications_utils import aware_utcnow
|
from notifications_utils import aware_utcnow
|
||||||
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
|
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
|
||||||
|
|
||||||
@@ -238,23 +238,43 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
|
|||||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="process-delivery-receipts")
|
@notify_celery.task(
|
||||||
def process_delivery_receipts():
|
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
|
||||||
print(hilite("ENTER PROCESS DELIVERY RECEIPTS"))
|
)
|
||||||
cloudwatch = AwsCloudwatchClient()
|
def process_delivery_receipts(self):
|
||||||
cloudwatch.init_app(current_app)
|
"""
|
||||||
start_time = aware_utcnow() - timedelta(minutes=10)
|
Every eight minutes or so (see config.py) we run this task, which searches the last ten
|
||||||
end_time = aware_utcnow()
|
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
|
||||||
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
|
is intentional. We don't mind re-updating things, it is better than losing data.
|
||||||
start_time, end_time
|
|
||||||
)
|
We also set this to retry with exponential backoff in the case of failure. The only way this would
|
||||||
delivered_receipts = list(delivered_receipts)
|
fail is if, for example the db went down, or redis filled causing the app to stop processing. But if
|
||||||
batch_size = 100
|
it does fail, we need to go back over at some point when things are running again and process those results.
|
||||||
for i in range(0, len(delivered_receipts), batch_size):
|
"""
|
||||||
batch = delivered_receipts[i : i + batch_size]
|
try:
|
||||||
dao_update_delivery_receipts(batch, True)
|
batch_size = 200 # in theory with postgresql this could be 10k to 20k?
|
||||||
failed_receipts = list(failed_receipts)
|
|
||||||
batch_size = 100
|
cloudwatch = AwsCloudwatchClient()
|
||||||
for i in range(0, len(failed_receipts), batch_size):
|
cloudwatch.init_app(current_app)
|
||||||
batch = failed_receipts[i : i + batch_size]
|
start_time = aware_utcnow() - timedelta(minutes=10)
|
||||||
dao_update_delivery_receipts(batch, False)
|
end_time = aware_utcnow()
|
||||||
|
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
|
||||||
|
start_time, end_time
|
||||||
|
)
|
||||||
|
delivered_receipts = list(delivered_receipts)
|
||||||
|
for i in range(0, len(delivered_receipts), batch_size):
|
||||||
|
batch = delivered_receipts[i : i + batch_size]
|
||||||
|
dao_update_delivery_receipts(batch, True)
|
||||||
|
failed_receipts = list(failed_receipts)
|
||||||
|
for i in range(0, len(failed_receipts), batch_size):
|
||||||
|
batch = failed_receipts[i : i + batch_size]
|
||||||
|
dao_update_delivery_receipts(batch, False)
|
||||||
|
except Exception as ex:
|
||||||
|
retry_count = self.request.retries
|
||||||
|
wait_time = 3600 * 2**retry_count
|
||||||
|
try:
|
||||||
|
raise self.retry(ex=ex, countdown=wait_time)
|
||||||
|
except self.MaxRetriesExceededError:
|
||||||
|
current_app.logger.error(
|
||||||
|
"Failed process delivery receipts after max retries"
|
||||||
|
)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import os
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
from boto3 import client
|
from boto3 import client
|
||||||
|
from flask import current_app
|
||||||
|
|
||||||
from app.clients import AWS_CLIENT_CONFIG, Client
|
from app.clients import AWS_CLIENT_CONFIG, Client
|
||||||
from app.cloudfoundry_config import cloud_config
|
from app.cloudfoundry_config import cloud_config
|
||||||
@@ -136,13 +137,13 @@ class AwsCloudwatchClient(Client):
|
|||||||
# and run this on a schedule.
|
# and run this on a schedule.
|
||||||
def check_delivery_receipts(self, start, end):
|
def check_delivery_receipts(self, start, end):
|
||||||
region = cloud_config.sns_region
|
region = cloud_config.sns_region
|
||||||
# TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258
|
|
||||||
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
||||||
delivered_event_set = set()
|
delivered_event_set = set()
|
||||||
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
||||||
print(hilite(f"LOG GROUP NAME {log_group_name}"))
|
|
||||||
all_delivered_events = self._get_log(log_group_name, start, end)
|
all_delivered_events = self._get_log(log_group_name, start, end)
|
||||||
print(f"ALL DELIVEREDS {len(all_delivered_events)}")
|
current_app.logger.info(
|
||||||
|
f"Delivered count {len(all_delivered_events)} over range {start} to {end}"
|
||||||
|
)
|
||||||
|
|
||||||
for event in all_delivered_events:
|
for event in all_delivered_events:
|
||||||
actual_event = self.event_to_db_format(event["message"])
|
actual_event = self.event_to_db_format(event["message"])
|
||||||
@@ -153,7 +154,9 @@ class AwsCloudwatchClient(Client):
|
|||||||
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure"
|
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure"
|
||||||
)
|
)
|
||||||
all_failed_events = self._get_log(log_group_name, start, end)
|
all_failed_events = self._get_log(log_group_name, start, end)
|
||||||
print(f"ALL FAILEDS {len(all_failed_events)}")
|
current_app.logger.info(
|
||||||
|
f"Failed count {len(all_delivered_events)} over range {start} to {end}"
|
||||||
|
)
|
||||||
for event in all_failed_events:
|
for event in all_failed_events:
|
||||||
actual_event = self.event_to_db_format(event["message"])
|
actual_event = self.event_to_db_format(event["message"])
|
||||||
failed_event_set.add(json.dumps(actual_event))
|
failed_event_set.add(json.dumps(actual_event))
|
||||||
|
|||||||
@@ -730,16 +730,6 @@ def dao_update_delivery_receipts(receipts, delivered):
|
|||||||
new_receipts.append(r)
|
new_receipts.append(r)
|
||||||
|
|
||||||
receipts = new_receipts
|
receipts = new_receipts
|
||||||
print(receipts)
|
|
||||||
|
|
||||||
statuses = {}
|
|
||||||
for r in receipts:
|
|
||||||
if r["status"].lower() == "success":
|
|
||||||
statuses[r["notification.messageId"]] = NotificationStatus.DELIVERED
|
|
||||||
else:
|
|
||||||
statuses[r["notification.messageId"]] = NotificationStatus.FAILED
|
|
||||||
|
|
||||||
print(f"HERE ARE STATUSES {statuses}")
|
|
||||||
|
|
||||||
id_to_carrier = {
|
id_to_carrier = {
|
||||||
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
|
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
|
||||||
@@ -777,6 +767,5 @@ def dao_update_delivery_receipts(receipts, delivered):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
print(stmt)
|
|
||||||
db.session.execute(stmt)
|
db.session.execute(stmt)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|||||||
Reference in New Issue
Block a user