mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 09:51:11 -05:00
more
This commit is contained in:
@@ -1,107 +1,19 @@
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
from botocore.exceptions import ClientError
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
|
|
||||||
from app import aws_cloudwatch_client, notify_celery, redis_store
|
from app import notify_celery, redis_store
|
||||||
from app.clients.email import EmailClientNonRetryableException
|
from app.clients.email import EmailClientNonRetryableException
|
||||||
from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
|
from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException
|
||||||
from app.clients.sms import SmsClientResponseException
|
from app.clients.sms import SmsClientResponseException
|
||||||
from app.config import Config, QueueNames
|
from app.config import Config, QueueNames
|
||||||
from app.dao import notifications_dao
|
from app.dao import notifications_dao
|
||||||
from app.dao.notifications_dao import (
|
from app.dao.notifications_dao import update_notification_status_by_id
|
||||||
sanitize_successful_notification_by_id,
|
|
||||||
update_notification_status_by_id,
|
|
||||||
)
|
|
||||||
from app.delivery import send_to_providers
|
from app.delivery import send_to_providers
|
||||||
from app.enums import NotificationStatus
|
from app.enums import NotificationStatus
|
||||||
from app.exceptions import NotificationTechnicalFailureException
|
from app.exceptions import NotificationTechnicalFailureException
|
||||||
from app.utils import utc_now
|
|
||||||
|
|
||||||
# This is the amount of time to wait after sending an sms message before we check the aws logs and look for delivery
|
|
||||||
# receipts
|
|
||||||
DELIVERY_RECEIPT_DELAY_IN_SECONDS = 30
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(
|
|
||||||
bind=True,
|
|
||||||
name="check_sms_delivery_receipt",
|
|
||||||
max_retries=48,
|
|
||||||
default_retry_delay=300,
|
|
||||||
)
|
|
||||||
def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
|
|
||||||
"""
|
|
||||||
This is called after deliver_sms to check the status of the message. This uses the same number of
|
|
||||||
retries and the same delay period as deliver_sms. In addition, this fires five minutes after
|
|
||||||
deliver_sms initially. So the idea is that most messages will succeed and show up in the logs quickly.
|
|
||||||
Other message will resolve successfully after a retry or to. A few will fail but it will take up to
|
|
||||||
4 hours to know for sure. The call to check_sms will raise an exception if neither a success nor a
|
|
||||||
failure appears in the cloudwatch logs, so this should keep retrying until the log appears, or until
|
|
||||||
we run out of retries.
|
|
||||||
"""
|
|
||||||
# TODO the localstack cloudwatch doesn't currently have our log groups. Possibly create them with awslocal?
|
|
||||||
if aws_cloudwatch_client.is_localstack():
|
|
||||||
status = "success"
|
|
||||||
provider_response = "this is a fake successful localstack sms message"
|
|
||||||
carrier = "unknown"
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
status, provider_response, carrier = aws_cloudwatch_client.check_sms(
|
|
||||||
message_id, notification_id, sent_at
|
|
||||||
)
|
|
||||||
except NotificationTechnicalFailureException as ntfe:
|
|
||||||
provider_response = "Unable to find carrier response -- still looking"
|
|
||||||
status = "pending"
|
|
||||||
carrier = ""
|
|
||||||
update_notification_status_by_id(
|
|
||||||
notification_id,
|
|
||||||
status,
|
|
||||||
carrier=carrier,
|
|
||||||
provider_response=provider_response,
|
|
||||||
)
|
|
||||||
raise self.retry(exc=ntfe)
|
|
||||||
except ClientError as err:
|
|
||||||
# Probably a ThrottlingException but could be something else
|
|
||||||
error_code = err.response["Error"]["Code"]
|
|
||||||
provider_response = (
|
|
||||||
f"{error_code} while checking sms receipt -- still looking"
|
|
||||||
)
|
|
||||||
status = "pending"
|
|
||||||
carrier = ""
|
|
||||||
update_notification_status_by_id(
|
|
||||||
notification_id,
|
|
||||||
status,
|
|
||||||
carrier=carrier,
|
|
||||||
provider_response=provider_response,
|
|
||||||
)
|
|
||||||
raise self.retry(exc=err)
|
|
||||||
|
|
||||||
if status == "success":
|
|
||||||
status = NotificationStatus.DELIVERED
|
|
||||||
elif status == "failure":
|
|
||||||
status = NotificationStatus.FAILED
|
|
||||||
# if status is not success or failure the client raised an exception and this method will retry
|
|
||||||
|
|
||||||
if status == NotificationStatus.DELIVERED:
|
|
||||||
sanitize_successful_notification_by_id(
|
|
||||||
notification_id, carrier=carrier, provider_response=provider_response
|
|
||||||
)
|
|
||||||
current_app.logger.info(
|
|
||||||
f"Sanitized notification {notification_id} that was successfully delivered"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
update_notification_status_by_id(
|
|
||||||
notification_id,
|
|
||||||
status,
|
|
||||||
carrier=carrier,
|
|
||||||
provider_response=provider_response,
|
|
||||||
)
|
|
||||||
current_app.logger.info(
|
|
||||||
f"Updated notification {notification_id} with response '{provider_response}'"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(
|
@notify_celery.task(
|
||||||
@@ -127,17 +39,8 @@ def deliver_sms(self, notification_id):
|
|||||||
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
|
ansi_green + f"AUTHENTICATION CODE: {notification.content}" + ansi_reset
|
||||||
)
|
)
|
||||||
# Code branches off to send_to_providers.py
|
# Code branches off to send_to_providers.py
|
||||||
message_id = send_to_providers.send_sms_to_provider(notification)
|
send_to_providers.send_sms_to_provider(notification)
|
||||||
|
|
||||||
# DEPRECATED
|
|
||||||
# We have to put it in UTC. For other timezones, the delay
|
|
||||||
# will be ignored and it will fire immediately (although this probably only affects developer testing)
|
|
||||||
my_eta = utc_now() + timedelta(seconds=DELIVERY_RECEIPT_DELAY_IN_SECONDS)
|
|
||||||
check_sms_delivery_receipt.apply_async(
|
|
||||||
[message_id, notification_id, notification.created_at],
|
|
||||||
eta=my_eta,
|
|
||||||
queue=QueueNames.CHECK_SMS,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
update_notification_status_by_id(
|
update_notification_status_by_id(
|
||||||
notification_id,
|
notification_id,
|
||||||
|
|||||||
@@ -110,65 +110,6 @@ class AwsCloudwatchClient(Client):
|
|||||||
return logline
|
return logline
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# DEPRECATED
|
|
||||||
def check_sms(self, message_id, notification_id, created_at):
|
|
||||||
region = cloud_config.sns_region
|
|
||||||
# TODO this clumsy approach to getting the account number will be fixed as part of notify-api #258
|
|
||||||
account_number = self._extract_account_number(cloud_config.ses_domain_arn)
|
|
||||||
|
|
||||||
time_now = utc_now()
|
|
||||||
log_group_name = f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber"
|
|
||||||
filter_pattern = '{$.notification.messageId="XXXXX"}'
|
|
||||||
filter_pattern = filter_pattern.replace("XXXXX", message_id)
|
|
||||||
all_log_events = self._get_log(filter_pattern, log_group_name, created_at)
|
|
||||||
if all_log_events and len(all_log_events) > 0:
|
|
||||||
event = all_log_events[0]
|
|
||||||
message = json.loads(event["message"])
|
|
||||||
self.warn_if_dev_is_opted_out(
|
|
||||||
message["delivery"]["providerResponse"], notification_id
|
|
||||||
)
|
|
||||||
# Here we map the answer from aws to the message_id.
|
|
||||||
# Previously, in send_to_providers, we mapped the job_id and row number
|
|
||||||
# to the message id. And on the admin side we mapped the csv filename
|
|
||||||
# to the job_id. So by tracing through all the logs we can go:
|
|
||||||
# filename->job_id->message_id->what really happened
|
|
||||||
current_app.logger.info(
|
|
||||||
hilite(f"DELIVERED: {message} for message_id {message_id}")
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
"success",
|
|
||||||
message["delivery"]["providerResponse"],
|
|
||||||
message["delivery"].get("phoneCarrier", "Unknown Carrier"),
|
|
||||||
)
|
|
||||||
|
|
||||||
log_group_name = (
|
|
||||||
f"sns/{region}/{account_number[4]}/DirectPublishToPhoneNumber/Failure"
|
|
||||||
)
|
|
||||||
all_failed_events = self._get_log(filter_pattern, log_group_name, created_at)
|
|
||||||
if all_failed_events and len(all_failed_events) > 0:
|
|
||||||
event = all_failed_events[0]
|
|
||||||
message = json.loads(event["message"])
|
|
||||||
self.warn_if_dev_is_opted_out(
|
|
||||||
message["delivery"]["providerResponse"], notification_id
|
|
||||||
)
|
|
||||||
|
|
||||||
current_app.logger.info(
|
|
||||||
hilite(f"FAILED: {message} for message_id {message_id}")
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
"failure",
|
|
||||||
message["delivery"]["providerResponse"],
|
|
||||||
message["delivery"].get("phoneCarrier", "Unknown Carrier"),
|
|
||||||
)
|
|
||||||
|
|
||||||
if time_now > (created_at + timedelta(hours=3)):
|
|
||||||
# see app/models.py Notification. This message corresponds to "permanent-failure",
|
|
||||||
# but we are copy/pasting here to avoid circular imports.
|
|
||||||
return "failure", "Unable to find carrier response."
|
|
||||||
raise NotificationTechnicalFailureException(
|
|
||||||
f"No event found for message_id {message_id} notification_id {notification_id}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def do_log_insights(self):
|
def do_log_insights(self):
|
||||||
region = cloud_config.sns_region
|
region = cloud_config.sns_region
|
||||||
|
|||||||
@@ -7,11 +7,7 @@ from celery.exceptions import MaxRetriesExceededError
|
|||||||
|
|
||||||
import app
|
import app
|
||||||
from app.celery import provider_tasks
|
from app.celery import provider_tasks
|
||||||
from app.celery.provider_tasks import (
|
from app.celery.provider_tasks import deliver_email, deliver_sms
|
||||||
check_sms_delivery_receipt,
|
|
||||||
deliver_email,
|
|
||||||
deliver_sms,
|
|
||||||
)
|
|
||||||
from app.clients.email import EmailClientNonRetryableException
|
from app.clients.email import EmailClientNonRetryableException
|
||||||
from app.clients.email.aws_ses import (
|
from app.clients.email.aws_ses import (
|
||||||
AwsSesClientException,
|
AwsSesClientException,
|
||||||
@@ -27,110 +23,10 @@ def test_should_have_decorated_tasks_functions():
|
|||||||
assert deliver_email.__wrapped__.__name__ == "deliver_email"
|
assert deliver_email.__wrapped__.__name__ == "deliver_email"
|
||||||
|
|
||||||
|
|
||||||
def test_should_check_delivery_receipts_success(sample_notification, mocker):
|
|
||||||
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.is_localstack",
|
|
||||||
return_value=False,
|
|
||||||
)
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.check_sms",
|
|
||||||
return_value=("success", "okay", "AT&T"),
|
|
||||||
)
|
|
||||||
mock_sanitize = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.sanitize_successful_notification_by_id"
|
|
||||||
)
|
|
||||||
check_sms_delivery_receipt(
|
|
||||||
"message_id", sample_notification.id, "2024-10-20 00:00:00+0:00"
|
|
||||||
)
|
|
||||||
# This call should be made if the message was successfully delivered
|
|
||||||
mock_sanitize.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_should_check_delivery_receipts_failure(sample_notification, mocker):
|
|
||||||
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.is_localstack",
|
|
||||||
return_value=False,
|
|
||||||
)
|
|
||||||
mock_update = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.update_notification_status_by_id"
|
|
||||||
)
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.check_sms",
|
|
||||||
return_value=("failure", "not okay", "AT&T"),
|
|
||||||
)
|
|
||||||
mock_sanitize = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.sanitize_successful_notification_by_id"
|
|
||||||
)
|
|
||||||
check_sms_delivery_receipt(
|
|
||||||
"message_id", sample_notification.id, "2024-10-20 00:00:00+0:00"
|
|
||||||
)
|
|
||||||
mock_sanitize.assert_not_called()
|
|
||||||
mock_update.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_should_check_delivery_receipts_client_error(sample_notification, mocker):
|
|
||||||
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.is_localstack",
|
|
||||||
return_value=False,
|
|
||||||
)
|
|
||||||
mock_update = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.update_notification_status_by_id"
|
|
||||||
)
|
|
||||||
error_response = {"Error": {"Code": "SomeCode", "Message": "Some Message"}}
|
|
||||||
operation_name = "SomeOperation"
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.check_sms",
|
|
||||||
side_effect=ClientError(error_response, operation_name),
|
|
||||||
)
|
|
||||||
mock_sanitize = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.sanitize_successful_notification_by_id"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
check_sms_delivery_receipt(
|
|
||||||
"message_id", sample_notification.id, "2024-10-20 00:00:00+0:00"
|
|
||||||
)
|
|
||||||
|
|
||||||
assert 1 == 0
|
|
||||||
except ClientError:
|
|
||||||
mock_sanitize.assert_not_called()
|
|
||||||
mock_update.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_should_check_delivery_receipts_ntfe(sample_notification, mocker):
|
|
||||||
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.is_localstack",
|
|
||||||
return_value=False,
|
|
||||||
)
|
|
||||||
mock_update = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.update_notification_status_by_id"
|
|
||||||
)
|
|
||||||
mocker.patch(
|
|
||||||
"app.celery.provider_tasks.aws_cloudwatch_client.check_sms",
|
|
||||||
side_effect=NotificationTechnicalFailureException(),
|
|
||||||
)
|
|
||||||
mock_sanitize = mocker.patch(
|
|
||||||
"app.celery.provider_tasks.sanitize_successful_notification_by_id"
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
check_sms_delivery_receipt(
|
|
||||||
"message_id", sample_notification.id, "2024-10-20 00:00:00+0:00"
|
|
||||||
)
|
|
||||||
|
|
||||||
assert 1 == 0
|
|
||||||
except NotificationTechnicalFailureException:
|
|
||||||
mock_sanitize.assert_not_called()
|
|
||||||
mock_update.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
def test_should_call_send_sms_to_provider_from_deliver_sms_task(
|
def test_should_call_send_sms_to_provider_from_deliver_sms_task(
|
||||||
sample_notification, mocker
|
sample_notification, mocker
|
||||||
):
|
):
|
||||||
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
mocker.patch("app.delivery.send_to_providers.send_sms_to_provider")
|
||||||
mocker.patch("app.celery.provider_tasks.check_sms_delivery_receipt")
|
|
||||||
|
|
||||||
deliver_sms(sample_notification.id)
|
deliver_sms(sample_notification.id)
|
||||||
app.delivery.send_to_providers.send_sms_to_provider.assert_called_with(
|
app.delivery.send_to_providers.send_sms_to_provider.assert_called_with(
|
||||||
|
|||||||
Reference in New Issue
Block a user