mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-15 01:32:20 -05:00
go back to filter log events
This commit is contained in:
@@ -35,7 +35,8 @@ from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
|||||||
from app.enums import JobStatus, NotificationType
|
from app.enums import JobStatus, NotificationType
|
||||||
from app.models import Job
|
from app.models import Job
|
||||||
from app.notifications.process_notifications import send_notification_to_queue
|
from app.notifications.process_notifications import send_notification_to_queue
|
||||||
from app.utils import utc_now
|
from app.utils import hilite, utc_now
|
||||||
|
from notifications_utils import aware_utcnow
|
||||||
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
|
from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket
|
||||||
|
|
||||||
MAX_NOTIFICATION_FAILS = 10000
|
MAX_NOTIFICATION_FAILS = 10000
|
||||||
@@ -239,12 +240,21 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
|
|||||||
|
|
||||||
@notify_celery.task(name="process-delivery-receipts")
|
@notify_celery.task(name="process-delivery-receipts")
|
||||||
def process_delivery_receipts():
|
def process_delivery_receipts():
|
||||||
|
print(hilite("ENTER PROCESS DELIVERY RECEIPTS"))
|
||||||
cloudwatch = AwsCloudwatchClient()
|
cloudwatch = AwsCloudwatchClient()
|
||||||
cloudwatch.init_app(current_app)
|
cloudwatch.init_app(current_app)
|
||||||
start_time = utc_now() - timedelta(minutes=10)
|
start_time = aware_utcnow() - timedelta(minutes=10)
|
||||||
end_time = utc_now()
|
end_time = aware_utcnow()
|
||||||
receipts = cloudwatch.check_delivery_receipts(start_time, end_time)
|
delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts(
|
||||||
|
start_time, end_time
|
||||||
|
)
|
||||||
|
delivered_receipts = list(delivered_receipts)
|
||||||
batch_size = 100
|
batch_size = 100
|
||||||
for i in range(0, len(receipts), batch_size):
|
for i in range(0, len(delivered_receipts), batch_size):
|
||||||
batch = receipts[i : i + batch_size]
|
batch = delivered_receipts[i : i + batch_size]
|
||||||
dao_update_delivery_receipts(batch)
|
dao_update_delivery_receipts(batch, True)
|
||||||
|
failed_receipts = list(failed_receipts)
|
||||||
|
batch_size = 100
|
||||||
|
for i in range(0, len(failed_receipts), batch_size):
|
||||||
|
batch = failed_receipts[i : i + batch_size]
|
||||||
|
dao_update_delivery_receipts(batch, False)
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
from boto3 import client
|
from boto3 import client
|
||||||
from flask import current_app
|
|
||||||
|
|
||||||
from app.clients import AWS_CLIENT_CONFIG, Client
|
from app.clients import AWS_CLIENT_CONFIG, Client
|
||||||
from app.cloudfoundry_config import cloud_config
|
from app.cloudfoundry_config import cloud_config
|
||||||
|
from app.utils import hilite
|
||||||
|
|
||||||
|
|
||||||
class AwsCloudwatchClient(Client):
|
class AwsCloudwatchClient(Client):
|
||||||
@@ -46,63 +46,116 @@ class AwsCloudwatchClient(Client):
|
|||||||
def is_localstack(self):
|
def is_localstack(self):
|
||||||
return self._is_localstack
|
return self._is_localstack
|
||||||
|
|
||||||
|
def _get_log(self, log_group_name, start, end):
|
||||||
|
# Check all cloudwatch logs from the time the notification was sent (currently 5 minutes previously) until now
|
||||||
|
print(hilite(f"START {start} END {end}"))
|
||||||
|
next_token = None
|
||||||
|
all_log_events = []
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if next_token:
|
||||||
|
response = self._client.filter_log_events(
|
||||||
|
logGroupName=log_group_name,
|
||||||
|
nextToken=next_token,
|
||||||
|
startTime=int(start.timestamp() * 1000),
|
||||||
|
endTime=int(end.timestamp() * 1000),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
response = self._client.filter_log_events(
|
||||||
|
logGroupName=log_group_name,
|
||||||
|
startTime=int(start.timestamp() * 1000),
|
||||||
|
endTime=int(end.timestamp() * 1000),
|
||||||
|
)
|
||||||
|
log_events = response.get("events", [])
|
||||||
|
all_log_events.extend(log_events)
|
||||||
|
next_token = response.get("nextToken")
|
||||||
|
if not next_token:
|
||||||
|
break
|
||||||
|
return all_log_events
|
||||||
|
|
||||||
def _extract_account_number(self, ses_domain_arn):
|
def _extract_account_number(self, ses_domain_arn):
|
||||||
account_number = ses_domain_arn.split(":")
|
account_number = ses_domain_arn.split(":")
|
||||||
return account_number
|
return account_number
|
||||||
|
|
||||||
def warn_if_dev_is_opted_out(self, provider_response, notification_id):
|
def event_to_db_format(self, event):
|
||||||
if (
|
|
||||||
"is opted out" in provider_response.lower()
|
|
||||||
or "has blocked sms" in provider_response.lower()
|
|
||||||
):
|
|
||||||
if os.getenv("NOTIFY_ENVIRONMENT") in ["development", "test"]:
|
|
||||||
ansi_red = "\033[31m"
|
|
||||||
ansi_reset = "\033[0m"
|
|
||||||
logline = (
|
|
||||||
ansi_red
|
|
||||||
+ f"The phone number for notification_id {notification_id} is OPTED OUT. You need to opt back in"
|
|
||||||
+ ansi_reset
|
|
||||||
)
|
|
||||||
current_app.logger.warning(logline)
|
|
||||||
return logline
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
# massage the data into the form the db expects. When we switch
|
||||||
|
# from filter_log_events to log insights this will be convenient
|
||||||
|
if isinstance(event, str):
|
||||||
|
event = json.loads(event)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"notification.messageId": event["notification"]["messageId"],
|
||||||
|
"status": event["status"],
|
||||||
|
"delivery.phoneCarrier": event["delivery"]["phoneCarrier"],
|
||||||
|
"delivery.providerResponse": event["delivery"]["providerResponse"],
|
||||||
|
"@timestamp": event["notification"]["timestamp"],
|
||||||
|
}
|
||||||
|
|
||||||
|
# Here is an example of how to get the events with log insights
|
||||||
|
# def do_log_insights():
|
||||||
|
# query = """
|
||||||
|
# fields @timestamp, status, message, recipient
|
||||||
|
# | filter status = "DELIVERED"
|
||||||
|
# | sort @timestamp asc
|
||||||
|
# """
|
||||||
|
# temp_client = boto3.client(
|
||||||
|
# "logs",
|
||||||
|
# region_name="us-gov-west-1",
|
||||||
|
# aws_access_key_id=AWS_ACCESS_KEY_ID,
|
||||||
|
# aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
|
||||||
|
# config=AWS_CLIENT_CONFIG,
|
||||||
|
# )
|
||||||
|
# start = utc_now()
|
||||||
|
# end = utc_now - timedelta(hours=1)
|
||||||
|
# response = temp_client.start_query(
|
||||||
|
# logGroupName = LOG_GROUP_NAME_DELIVERED,
|
||||||
|
# startTime = int(start.timestamp()),
|
||||||
|
# endTime= int(end.timestamp()),
|
||||||
|
# queryString = query
|
||||||
|
|
||||||
|
# )
|
||||||
|
# query_id = response['queryId']
|
||||||
|
# while True:
|
||||||
|
# result = temp_client.get_query_results(queryId=query_id)
|
||||||
|
# if result['status'] == 'Complete':
|
||||||
|
# break
|
||||||
|
# time.sleep(1)
|
||||||
|
|
||||||
|
# delivery_receipts = []
|
||||||
|
# for log in result['results']:
|
||||||
|
# receipt = {field['field']: field['value'] for field in log}
|
||||||
|
# delivery_receipts.append(receipt)
|
||||||
|
# print(receipt)
|
||||||
|
|
||||||
|
# print(len(delivery_receipts))
|
||||||
|
|
||||||
|
# In the long run we want to use Log Insights because it is more efficient
|
||||||
|
# that filter_log_events. But we are blocked by a permissions issue in the broker.
|
||||||
|
# So for now, use filter_log_events and grab all log_events over a 10 minute interval,
|
||||||
|
# and run this on a schedule.
|
||||||
def check_delivery_receipts(self, start, end):
|
def check_delivery_receipts(self, start, end):
|
||||||
|
|
||||||
region = cloud_config.sns_region
|
region = cloud_config.sns_region
|
||||||
|
# TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258
|
||||||
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
||||||
|
delivered_event_set = set()
|
||||||
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
||||||
log_group_name_failed = (
|
print(hilite(f"LOG GROUP NAME {log_group_name}"))
|
||||||
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failed"
|
all_delivered_events = self._get_log(log_group_name, start, end)
|
||||||
|
print(f"ALL DELIVEREDS {len(all_delivered_events)}")
|
||||||
|
|
||||||
|
for event in all_delivered_events:
|
||||||
|
actual_event = self.event_to_db_format(event["message"])
|
||||||
|
delivered_event_set.add(json.dumps(actual_event))
|
||||||
|
|
||||||
|
failed_event_set = set()
|
||||||
|
log_group_name = (
|
||||||
|
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure"
|
||||||
)
|
)
|
||||||
|
all_failed_events = self._get_log(log_group_name, start, end)
|
||||||
|
print(f"ALL FAILEDS {len(all_failed_events)}")
|
||||||
|
for event in all_failed_events:
|
||||||
|
actual_event = self.event_to_db_format(event["message"])
|
||||||
|
failed_event_set.add(json.dumps(actual_event))
|
||||||
|
|
||||||
query = """
|
return delivered_event_set, failed_event_set
|
||||||
fields @timestamp, status, delivery.providerResponse, delivery.destination,
|
|
||||||
notification.messageId, delivery.phoneCarrier
|
|
||||||
| sort @timestamp asc
|
|
||||||
"""
|
|
||||||
|
|
||||||
delivered = self.run_log_insights_query(log_group_name, start, end, query)
|
|
||||||
failed = self.run_log_insights_query(log_group_name_failed, start, end, query)
|
|
||||||
return delivered + failed
|
|
||||||
|
|
||||||
def run_log_insights_query(self, log_group_name, start, end, query):
|
|
||||||
response = self._client.start_query(
|
|
||||||
logGroupName=log_group_name,
|
|
||||||
startTime=int(start.timestamp()),
|
|
||||||
endTime=int(end.timestamp()),
|
|
||||||
queryString=query,
|
|
||||||
)
|
|
||||||
query_id = response["queryId"]
|
|
||||||
while True:
|
|
||||||
result = self._client.get_query_results(queryId=query_id)
|
|
||||||
if result["status"] == "Complete":
|
|
||||||
break
|
|
||||||
sleep(1)
|
|
||||||
|
|
||||||
delivery_receipts = []
|
|
||||||
for log in result["results"]:
|
|
||||||
receipt = {field["field"]: field["value"] for field in log}
|
|
||||||
delivery_receipts.append(receipt)
|
|
||||||
return delivery_receipts
|
|
||||||
|
|||||||
@@ -1,7 +1,20 @@
|
|||||||
|
import json
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from sqlalchemy import asc, delete, desc, func, or_, select, text, union, update
|
from sqlalchemy import (
|
||||||
|
TIMESTAMP,
|
||||||
|
asc,
|
||||||
|
cast,
|
||||||
|
delete,
|
||||||
|
desc,
|
||||||
|
func,
|
||||||
|
or_,
|
||||||
|
select,
|
||||||
|
text,
|
||||||
|
union,
|
||||||
|
update,
|
||||||
|
)
|
||||||
from sqlalchemy.orm import joinedload
|
from sqlalchemy.orm import joinedload
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
from sqlalchemy.sql import functions
|
from sqlalchemy.sql import functions
|
||||||
@@ -709,8 +722,25 @@ def get_service_ids_with_notifications_on_date(notification_type, date):
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def dao_update_delivery_receipts(receipts):
|
def dao_update_delivery_receipts(receipts, delivered):
|
||||||
id_to_status = {r["notification.messageId"]: r["status"] for r in receipts}
|
new_receipts = []
|
||||||
|
for r in receipts:
|
||||||
|
if isinstance(r, str):
|
||||||
|
r = json.loads(r)
|
||||||
|
new_receipts.append(r)
|
||||||
|
|
||||||
|
receipts = new_receipts
|
||||||
|
print(receipts)
|
||||||
|
|
||||||
|
statuses = {}
|
||||||
|
for r in receipts:
|
||||||
|
if r["status"].lower() == "success":
|
||||||
|
statuses[r["notification.messageId"]] = NotificationStatus.DELIVERED
|
||||||
|
else:
|
||||||
|
statuses[r["notification.messageId"]] = NotificationStatus.FAILED
|
||||||
|
|
||||||
|
print(f"HERE ARE STATUSES {statuses}")
|
||||||
|
|
||||||
id_to_carrier = {
|
id_to_carrier = {
|
||||||
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
|
r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts
|
||||||
}
|
}
|
||||||
@@ -719,16 +749,34 @@ def dao_update_delivery_receipts(receipts):
|
|||||||
}
|
}
|
||||||
id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts}
|
id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts}
|
||||||
|
|
||||||
|
status_to_update_with = NotificationStatus.DELIVERED
|
||||||
|
if not delivered:
|
||||||
|
status_to_update_with = NotificationStatus.FAILED
|
||||||
stmt = (
|
stmt = (
|
||||||
update(Notification)
|
update(Notification)
|
||||||
.where(Notification.c.message_id.in_(id_to_carrier.keys()))
|
.where(Notification.message_id.in_(id_to_carrier.keys()))
|
||||||
.values(
|
.values(
|
||||||
carrier=case(id_to_carrier),
|
carrier=case(
|
||||||
status=case(id_to_status),
|
*[
|
||||||
notification_status=case(id_to_status),
|
(Notification.message_id == key, value)
|
||||||
sent_at=case(id_to_timestamp),
|
for key, value in id_to_carrier.items()
|
||||||
provider_response=case(id_to_provider_response),
|
]
|
||||||
|
),
|
||||||
|
status=status_to_update_with,
|
||||||
|
sent_at=case(
|
||||||
|
*[
|
||||||
|
(Notification.message_id == key, cast(value, TIMESTAMP))
|
||||||
|
for key, value in id_to_timestamp.items()
|
||||||
|
]
|
||||||
|
),
|
||||||
|
provider_response=case(
|
||||||
|
*[
|
||||||
|
(Notification.message_id == key, value)
|
||||||
|
for key, value in id_to_provider_response.items()
|
||||||
|
]
|
||||||
|
),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
print(stmt)
|
||||||
db.session.execute(stmt)
|
db.session.execute(stmt)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|||||||
Reference in New Issue
Block a user