This commit is contained in:
Kenneth Kehl
2024-12-13 17:30:00 -08:00
parent b784d33173
commit b28b547762
3 changed files with 51 additions and 54 deletions

View File

@@ -240,6 +240,7 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
@notify_celery.task(name="process-delivery-receipts") @notify_celery.task(name="process-delivery-receipts")
def process_delivery_receipts(): def process_delivery_receipts():
cloudwatch = AwsCloudwatchClient() cloudwatch = AwsCloudwatchClient()
cloudwatch.init_app(current_app)
start_time = utc_now() - timedelta(minutes=10) start_time = utc_now() - timedelta(minutes=10)
end_time = utc_now() end_time = utc_now()
receipts = cloudwatch.check_delivery_receipts(start_time, end_time) receipts = cloudwatch.check_delivery_receipts(start_time, end_time)

View File

@@ -7,7 +7,6 @@ from flask import current_app
from app.clients import AWS_CLIENT_CONFIG, Client from app.clients import AWS_CLIENT_CONFIG, Client
from app.cloudfoundry_config import cloud_config from app.cloudfoundry_config import cloud_config
from app.dao.notifications_dao import dao_update_delivery_receipts
from app.utils import utc_now from app.utils import utc_now
@@ -107,63 +106,60 @@ class AwsCloudwatchClient(Client):
return logline return logline
return None return None
def check_delivery_receipts(self, start, end):
region = cloud_config.sns_region
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
def check_delivery_receipts(self, start, end): log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
region = cloud_config.sns_region log_group_name_failed = (
account_number = self._extract_account_number(cloud_config.ses_domain_arn) f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failed"
)
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber" query = """
log_group_name_failed = ( fields @timestamp, status, delivery.providerResponse, delivery.destination,
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failed" notification.messageId, delivery.phoneCarrier
) | sort @timestamp asc
"""
query = """ response = self._client.start_query(
fields @timestamp, status, delivery.providerResponse, delivery.destination, logGroupName=log_group_name,
notification.messageId, delivery.phoneCarrier startTime=int(start.timestamp()),
| sort @timestamp asc endTime=int(end.timestamp()),
""" queryString=query,
)
query_id = response["queryId"]
while True:
result = client._client.get_query_results(queryId=query_id)
if result["status"] == "Complete":
break
sleep(1)
response = client._client.start_query( delivery_receipts = []
logGroupName=log_group_name, for log in result["results"]:
startTime=int(start.timestamp()), receipt = {field["field"]: field["value"] for field in log}
endTime=int(end.timestamp()), delivery_receipts.append(receipt)
queryString=query, print(receipt)
)
query_id = response["queryId"]
while True:
result = client._client.get_query_results(queryId=query_id)
if result["status"] == "Complete":
break
sleep(1)
delivery_receipts = [] delivered = delivery_receipts
for log in result["results"]:
receipt = {field["field"]: field["value"] for field in log}
delivery_receipts.append(receipt)
print(receipt)
delivered = delivery_receipts response = client._client.start_query(
logGroupName=log_group_name_failed,
startTime=int(start.timestamp()),
endTime=int(end.timestamp()),
queryString=query,
)
query_id = response["queryId"]
while True:
result = client._client.get_query_results(queryId=query_id)
if result["status"] == "Complete":
break
sleep(1)
response = client._client.start_query( delivery_receipts = []
logGroupName=log_group_name_failed, for log in result["results"]:
startTime=int(start.timestamp()), receipt = {field["field"]: field["value"] for field in log}
endTime=int(end.timestamp()), delivery_receipts.append(receipt)
queryString=query, print(receipt)
)
query_id = response["queryId"]
while True:
result = client._client.get_query_results(queryId=query_id)
if result["status"] == "Complete":
break
sleep(1)
delivery_receipts = [] failed = delivery_receipts
for log in result["results"]: return delivered + failed
receipt = {field["field"]: field["value"] for field in log}
delivery_receipts.append(receipt)
print(receipt)
failed = delivery_receipts
dao_update_delivery_receipts(delivered)
dao_update_delivery_receipts(failed)

View File

@@ -198,7 +198,7 @@ class Config(object):
}, },
"process-delivery-receipts": { "process-delivery-receipts": {
"task": "process-delivery-receipts", "task": "process-delivery-receipts",
"schedule": timedelta(minutes=8), "schedule": timedelta(minutes=1),
"options": {"queue": QueueNames.PERIODIC}, "options": {"queue": QueueNames.PERIODIC},
}, },
"expire-or-delete-invitations": { "expire-or-delete-invitations": {