mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-10 15:22:24 -05:00
switch to log insights for delivery receipts
This commit is contained in:
@@ -11,6 +11,7 @@ from app.celery.tasks import (
|
||||
process_job,
|
||||
process_row,
|
||||
)
|
||||
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
|
||||
from app.config import QueueNames
|
||||
from app.dao.invited_org_user_dao import (
|
||||
delete_org_invitations_created_more_than_two_days_ago,
|
||||
@@ -231,3 +232,11 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
|
||||
technical_ticket=True,
|
||||
)
|
||||
zendesk_client.send_ticket_to_zendesk(ticket)
|
||||
|
||||
|
||||
@notify_celery.task(name="process_delivery_receipts_first_wave")
|
||||
def process_delivery_receipts_first_wave():
|
||||
cloudwatch = AwsCloudwatchClient()
|
||||
start_time = utc_now() - timedelta(hours=1)
|
||||
end_time = utc_now()
|
||||
receipts = cloudwatch.check_delivery_receipts(start_time, end_time)
|
||||
|
||||
@@ -2,12 +2,14 @@ import json
|
||||
import os
|
||||
import re
|
||||
from datetime import timedelta
|
||||
from time import sleep
|
||||
|
||||
from boto3 import client
|
||||
from flask import current_app
|
||||
|
||||
from app.clients import AWS_CLIENT_CONFIG, Client
|
||||
from app.cloudfoundry_config import cloud_config
|
||||
from app.dao.notifications_dao import dao_update_delivery_receipts
|
||||
from app.exceptions import NotificationTechnicalFailureException
|
||||
from app.utils import hilite, utc_now
|
||||
|
||||
@@ -108,6 +110,7 @@ class AwsCloudwatchClient(Client):
|
||||
return logline
|
||||
return None
|
||||
|
||||
# DEPRECATED
|
||||
def check_sms(self, message_id, notification_id, created_at):
|
||||
region = cloud_config.sns_region
|
||||
# TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258
|
||||
@@ -165,3 +168,65 @@ class AwsCloudwatchClient(Client):
|
||||
raise NotificationTechnicalFailureException(
|
||||
f"No event found for message_id {message_id} notification_id {notification_id}"
|
||||
)
|
||||
|
||||
|
||||
def do_log_insights(self):
|
||||
region = cloud_config.sns_region
|
||||
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
||||
|
||||
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
||||
log_group_name_failed = (
|
||||
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failed"
|
||||
)
|
||||
|
||||
query = """
|
||||
fields @timestamp, status, delivery.providerResponse, delivery.destination, notification.messageId, delivery.phoneCarrier
|
||||
| sort @timestamp asc
|
||||
"""
|
||||
start = utc_now() - timedelta(hours=1)
|
||||
end = utc_now()
|
||||
|
||||
response = client._client.start_query(
|
||||
logGroupName=log_group_name,
|
||||
startTime=int(start.timestamp()),
|
||||
endTime=int(end.timestamp()),
|
||||
queryString=query,
|
||||
)
|
||||
query_id = response["queryId"]
|
||||
while True:
|
||||
result = client._client.get_query_results(queryId=query_id)
|
||||
if result["status"] == "Complete":
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
delivery_receipts = []
|
||||
for log in result["results"]:
|
||||
receipt = {field["field"]: field["value"] for field in log}
|
||||
delivery_receipts.append(receipt)
|
||||
print(receipt)
|
||||
|
||||
delivered = delivery_receipts
|
||||
|
||||
response = client._client.start_query(
|
||||
logGroupName=log_group_name_failed,
|
||||
startTime=int(start.timestamp()),
|
||||
endTime=int(end.timestamp()),
|
||||
queryString=query,
|
||||
)
|
||||
query_id = response["queryId"]
|
||||
while True:
|
||||
result = client._client.get_query_results(queryId=query_id)
|
||||
if result["status"] == "Complete":
|
||||
break
|
||||
sleep(1)
|
||||
|
||||
delivery_receipts = []
|
||||
for log in result["results"]:
|
||||
receipt = {field["field"]: field["value"] for field in log}
|
||||
delivery_receipts.append(receipt)
|
||||
print(receipt)
|
||||
|
||||
failed = delivery_receipts
|
||||
|
||||
dao_update_delivery_receipts(delivered)
|
||||
dao_update_delivery_receipts(failed)
|
||||
|
||||
@@ -707,3 +707,27 @@ def get_service_ids_with_notifications_on_date(notification_type, date):
|
||||
union(notification_table_query, ft_status_table_query).subquery()
|
||||
).distinct()
|
||||
}
|
||||
|
||||
|
||||
def dao_update_delivery_receipts(receipts):
|
||||
id_to_status = {r["notification.messageId"]: r["status"] for r in receipts}
|
||||
id_to_carrier = {
|
||||
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
|
||||
}
|
||||
id_to_provider_response = {
|
||||
r["notification.messageId"]: r["delivery.providerResponse"] for r in receipts
|
||||
}
|
||||
id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts}
|
||||
|
||||
stmt = (
|
||||
update(Notification)
|
||||
.where(Notification.c.message_id.in_(id_to_carrier.keys()))
|
||||
.values(
|
||||
carrier=case(id_to_carrier),
|
||||
status=case(id_to_status),
|
||||
notification_status=case(id_to_status),
|
||||
sent_at=case(id_to_timestamp),
|
||||
)
|
||||
)
|
||||
db.session.execute(stmt)
|
||||
db.session.commit()
|
||||
|
||||
Reference in New Issue
Block a user