diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 94bea4bea..b14d50ee5 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -35,7 +35,8 @@ from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.enums import JobStatus, NotificationType from app.models import Job from app.notifications.process_notifications import send_notification_to_queue -from app.utils import utc_now +from app.utils import hilite, utc_now +from notifications_utils import aware_utcnow from notifications_utils.clients.zendesk.zendesk_client import NotifySupportTicket MAX_NOTIFICATION_FAILS = 10000 @@ -239,12 +240,21 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(): @notify_celery.task(name="process-delivery-receipts") def process_delivery_receipts(): + print(hilite("ENTER PROCESS DELIVERY RECEIPTS")) cloudwatch = AwsCloudwatchClient() cloudwatch.init_app(current_app) - start_time = utc_now() - timedelta(minutes=10) - end_time = utc_now() - receipts = cloudwatch.check_delivery_receipts(start_time, end_time) + start_time = aware_utcnow() - timedelta(minutes=10) + end_time = aware_utcnow() + delivered_receipts, failed_receipts = cloudwatch.check_delivery_receipts( + start_time, end_time + ) + delivered_receipts = list(delivered_receipts) batch_size = 100 - for i in range(0, len(receipts), batch_size): - batch = receipts[i : i + batch_size] - dao_update_delivery_receipts(batch) + for i in range(0, len(delivered_receipts), batch_size): + batch = delivered_receipts[i : i + batch_size] + dao_update_delivery_receipts(batch, True) + failed_receipts = list(failed_receipts) + batch_size = 100 + for i in range(0, len(failed_receipts), batch_size): + batch = failed_receipts[i : i + batch_size] + dao_update_delivery_receipts(batch, False) diff --git a/app/clients/cloudwatch/aws_cloudwatch.py b/app/clients/cloudwatch/aws_cloudwatch.py index cd510e9dd..d84044943 100644 --- a/app/clients/cloudwatch/aws_cloudwatch.py +++ b/app/clients/cloudwatch/aws_cloudwatch.py @@ -1,12 +1,12 @@ +import json import os import re -from time import sleep from boto3 import client -from flask import current_app from app.clients import AWS_CLIENT_CONFIG, Client from app.cloudfoundry_config import cloud_config +from app.utils import hilite class AwsCloudwatchClient(Client): @@ -46,63 +46,116 @@ class AwsCloudwatchClient(Client): def is_localstack(self): return self._is_localstack + def _get_log(self, log_group_name, start, end): + # Check all cloudwatch logs from the time the notification was sent (currently 5 minutes previously) until now + print(hilite(f"START {start} END {end}")) + next_token = None + all_log_events = [] + + while True: + if next_token: + response = self._client.filter_log_events( + logGroupName=log_group_name, + nextToken=next_token, + startTime=int(start.timestamp() * 1000), + endTime=int(end.timestamp() * 1000), + ) + else: + response = self._client.filter_log_events( + logGroupName=log_group_name, + startTime=int(start.timestamp() * 1000), + endTime=int(end.timestamp() * 1000), + ) + log_events = response.get("events", []) + all_log_events.extend(log_events) + next_token = response.get("nextToken") + if not next_token: + break + return all_log_events + def _extract_account_number(self, ses_domain_arn): account_number = ses_domain_arn.split(":") return account_number - def warn_if_dev_is_opted_out(self, provider_response, notification_id): - if ( - "is opted out" in provider_response.lower() - or "has blocked sms" in provider_response.lower() - ): - if os.getenv("NOTIFY_ENVIRONMENT") in ["development", "test"]: - ansi_red = "\033[31m" - ansi_reset = "\033[0m" - logline = ( - ansi_red - + f"The phone number for notification_id {notification_id} is OPTED OUT. You need to opt back in" - + ansi_reset - ) - current_app.logger.warning(logline) - return logline - return None + def event_to_db_format(self, event): + # massage the data into the form the db expects. When we switch + # from filter_log_events to log insights this will be convenient + if isinstance(event, str): + event = json.loads(event) + + return { + "notification.messageId": event["notification"]["messageId"], + "status": event["status"], + "delivery.phoneCarrier": event["delivery"]["phoneCarrier"], + "delivery.providerResponse": event["delivery"]["providerResponse"], + "@timestamp": event["notification"]["timestamp"], + } + + # Here is an example of how to get the events with log insights + # def do_log_insights(): + # query = """ + # fields @timestamp, status, message, recipient + # | filter status = "DELIVERED" + # | sort @timestamp asc + # """ + # temp_client = boto3.client( + # "logs", + # region_name="us-gov-west-1", + # aws_access_key_id=AWS_ACCESS_KEY_ID, + # aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + # config=AWS_CLIENT_CONFIG, + # ) + # start = utc_now() + # end = utc_now - timedelta(hours=1) + # response = temp_client.start_query( + # logGroupName = LOG_GROUP_NAME_DELIVERED, + # startTime = int(start.timestamp()), + # endTime= int(end.timestamp()), + # queryString = query + + # ) + # query_id = response['queryId'] + # while True: + # result = temp_client.get_query_results(queryId=query_id) + # if result['status'] == 'Complete': + # break + # time.sleep(1) + + # delivery_receipts = [] + # for log in result['results']: + # receipt = {field['field']: field['value'] for field in log} + # delivery_receipts.append(receipt) + # print(receipt) + + # print(len(delivery_receipts)) + + # In the long run we want to use Log Insights because it is more efficient + # that filter_log_events. But we are blocked by a permissions issue in the broker. + # So for now, use filter_log_events and grab all log_events over a 10 minute interval, + # and run this on a schedule. def check_delivery_receipts(self, start, end): - region = cloud_config.sns_region + # TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258 account_number = self._extract_account_number(cloud_config.ses_domain_arn) - + delivered_event_set = set() log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber" - log_group_name_failed = ( - f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failed" + print(hilite(f"LOG GROUP NAME {log_group_name}")) + all_delivered_events = self._get_log(log_group_name, start, end) + print(f"ALL DELIVEREDS {len(all_delivered_events)}") + + for event in all_delivered_events: + actual_event = self.event_to_db_format(event["message"]) + delivered_event_set.add(json.dumps(actual_event)) + + failed_event_set = set() + log_group_name = ( + f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure" ) + all_failed_events = self._get_log(log_group_name, start, end) + print(f"ALL FAILEDS {len(all_failed_events)}") + for event in all_failed_events: + actual_event = self.event_to_db_format(event["message"]) + failed_event_set.add(json.dumps(actual_event)) - query = """ - fields @timestamp, status, delivery.providerResponse, delivery.destination, - notification.messageId, delivery.phoneCarrier - | sort @timestamp asc - """ - - delivered = self.run_log_insights_query(log_group_name, start, end, query) - failed = self.run_log_insights_query(log_group_name_failed, start, end, query) - return delivered + failed - - def run_log_insights_query(self, log_group_name, start, end, query): - response = self._client.start_query( - logGroupName=log_group_name, - startTime=int(start.timestamp()), - endTime=int(end.timestamp()), - queryString=query, - ) - query_id = response["queryId"] - while True: - result = self._client.get_query_results(queryId=query_id) - if result["status"] == "Complete": - break - sleep(1) - - delivery_receipts = [] - for log in result["results"]: - receipt = {field["field"]: field["value"] for field in log} - delivery_receipts.append(receipt) - return delivery_receipts + return delivered_event_set, failed_event_set diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index d3871c59b..ef338dc4d 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -1,7 +1,20 @@ +import json from datetime import timedelta from flask import current_app -from sqlalchemy import asc, delete, desc, func, or_, select, text, union, update +from sqlalchemy import ( + TIMESTAMP, + asc, + cast, + delete, + desc, + func, + or_, + select, + text, + union, + update, +) from sqlalchemy.orm import joinedload from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.sql import functions @@ -709,8 +722,25 @@ def get_service_ids_with_notifications_on_date(notification_type, date): } -def dao_update_delivery_receipts(receipts): - id_to_status = {r["notification.messageId"]: r["status"] for r in receipts} +def dao_update_delivery_receipts(receipts, delivered): + new_receipts = [] + for r in receipts: + if isinstance(r, str): + r = json.loads(r) + new_receipts.append(r) + + receipts = new_receipts + print(receipts) + + statuses = {} + for r in receipts: + if r["status"].lower() == "success": + statuses[r["notification.messageId"]] = NotificationStatus.DELIVERED + else: + statuses[r["notification.messageId"]] = NotificationStatus.FAILED + + print(f"HERE ARE STATUSES {statuses}") + id_to_carrier = { r["notification.messageId"]: r["delivery.phoneCarrier"] for r in receipts } @@ -719,16 +749,34 @@ def dao_update_delivery_receipts(receipts): } id_to_timestamp = {r["notification.messageId"]: r["@timestamp"] for r in receipts} + status_to_update_with = NotificationStatus.DELIVERED + if not delivered: + status_to_update_with = NotificationStatus.FAILED stmt = ( update(Notification) - .where(Notification.c.message_id.in_(id_to_carrier.keys())) + .where(Notification.message_id.in_(id_to_carrier.keys())) .values( - carrier=case(id_to_carrier), - status=case(id_to_status), - notification_status=case(id_to_status), - sent_at=case(id_to_timestamp), - provider_response=case(id_to_provider_response), + carrier=case( + *[ + (Notification.message_id == key, value) + for key, value in id_to_carrier.items() + ] + ), + status=status_to_update_with, + sent_at=case( + *[ + (Notification.message_id == key, cast(value, TIMESTAMP)) + for key, value in id_to_timestamp.items() + ] + ), + provider_response=case( + *[ + (Notification.message_id == key, value) + for key, value in id_to_provider_response.items() + ] + ), ) ) + print(stmt) db.session.execute(stmt) db.session.commit()