merge from main

This commit is contained in:
Kenneth Kehl
2023-07-06 13:45:53 -07:00
25 changed files with 389 additions and 156 deletions

View File

@@ -1,6 +1,5 @@
from datetime import datetime, timedelta
from time import time
from zoneinfo import ZoneInfo
from flask import current_app
from sqlalchemy.orm.exc import NoResultFound
@@ -12,14 +11,14 @@ from app.clients.sms import SmsClientResponseException
from app.config import QueueNames
from app.dao import notifications_dao
from app.dao.notifications_dao import (
insert_notification_history_delete_notifications_by_id,
sanitize_successful_notification_by_id,
update_notification_status_by_id,
)
from app.delivery import send_to_providers
from app.exceptions import NotificationTechnicalFailureException
from app.models import (
NOTIFICATION_DELIVERED,
NOTIFICATION_FAILED,
NOTIFICATION_SENT,
NOTIFICATION_TECHNICAL_FAILURE,
)
@@ -37,15 +36,17 @@ def check_sms_delivery_receipt(self, message_id, notification_id, sent_at):
"""
status, provider_response = aws_cloudwatch_client.check_sms(message_id, notification_id, sent_at)
if status == 'success':
status = NOTIFICATION_SENT
else:
status = NOTIFICATION_DELIVERED
elif status == 'failure':
status = NOTIFICATION_FAILED
update_notification_status_by_id(notification_id, status, provider_response=provider_response)
current_app.logger.info(f"Updated notification {notification_id} with response '{provider_response}'")
# if status is not success or failure the client raised an exception and this method will retry
if status == NOTIFICATION_SENT:
insert_notification_history_delete_notifications_by_id(notification_id)
current_app.logger.info(f"Archived notification {notification_id} that was successfully sent")
if status == NOTIFICATION_DELIVERED:
sanitize_successful_notification_by_id(notification_id)
current_app.logger.info(f"Sanitized notification {notification_id} that was successfully delivered")
else:
update_notification_status_by_id(notification_id, status, provider_response=provider_response)
current_app.logger.info(f"Updated notification {notification_id} with response '{provider_response}'")
@notify_celery.task(bind=True, name="deliver_sms", max_retries=48, default_retry_delay=300)
@@ -58,9 +59,9 @@ def deliver_sms(self, notification_id):
if not notification:
raise NoResultFound()
message_id = send_to_providers.send_sms_to_provider(notification)
# We have to put it in the default US/Eastern timezone. From zones west of there, the delay
# We have to put it in UTC. For other timezones, the delay
# will be ignored and it will fire immediately (although this probably only affects developer testing)
my_eta = datetime.now(ZoneInfo('US/Eastern')) + timedelta(seconds=300)
my_eta = datetime.utcnow() + timedelta(seconds=300)
check_sms_delivery_receipt.apply_async(
[message_id, notification_id, now],
eta=my_eta,

View File

@@ -84,6 +84,6 @@ class AwsCloudwatchClient(Client):
if all_failed_events and len(all_failed_events) > 0:
event = all_failed_events[0]
message = json.loads(event['message'])
return "fail", message['delivery']['providerResponse']
return "failure", message['delivery']['providerResponse']
raise Exception(f'No event found for message_id {message_id} notification_id {notification_id}')

View File

@@ -162,7 +162,7 @@ class Config(object):
'broker_transport_options': {
'visibility_timeout': 310,
},
'timezone': getenv("TIMEZONE", 'America/New_York'),
'timezone': getenv("TIMEZONE", 'UTC'),
'imports': [
'app.celery.tasks',
'app.celery.scheduled_tasks',
@@ -213,47 +213,47 @@ class Config(object):
# app/celery/nightly_tasks.py
'timeout-sending-notifications': {
'task': 'timeout-sending-notifications',
'schedule': crontab(hour=0, minute=5),
'schedule': crontab(hour=4, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'create-nightly-billing': {
'task': 'create-nightly-billing',
'schedule': crontab(hour=0, minute=15),
'schedule': crontab(hour=4, minute=15),
'options': {'queue': QueueNames.REPORTING}
},
'create-nightly-notification-status': {
'task': 'create-nightly-notification-status',
'schedule': crontab(hour=0, minute=30), # after 'timeout-sending-notifications'
'schedule': crontab(hour=4, minute=30), # after 'timeout-sending-notifications'
'options': {'queue': QueueNames.REPORTING}
},
'delete-notifications-older-than-retention': {
'task': 'delete-notifications-older-than-retention',
'schedule': crontab(hour=3, minute=0), # after 'create-nightly-notification-status'
'schedule': crontab(hour=7, minute=0), # after 'create-nightly-notification-status'
'options': {'queue': QueueNames.REPORTING}
},
'delete-inbound-sms': {
'task': 'delete-inbound-sms',
'schedule': crontab(hour=1, minute=40),
'schedule': crontab(hour=5, minute=40),
'options': {'queue': QueueNames.PERIODIC}
},
'save-daily-notification-processing-time': {
'task': 'save-daily-notification-processing-time',
'schedule': crontab(hour=2, minute=0),
'schedule': crontab(hour=6, minute=0),
'options': {'queue': QueueNames.PERIODIC}
},
'cleanup-unfinished-jobs': {
'task': 'cleanup-unfinished-jobs',
'schedule': crontab(hour=0, minute=5),
'schedule': crontab(hour=4, minute=5),
'options': {'queue': QueueNames.PERIODIC}
},
'remove_sms_email_jobs': {
'task': 'remove_sms_email_jobs',
'schedule': crontab(hour=4, minute=0),
'schedule': crontab(hour=8, minute=0),
'options': {'queue': QueueNames.PERIODIC},
},
'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers': {
'task': 'check-for-services-with-high-failure-rates-or-sending-to-tv-numbers',
'schedule': crontab(day_of_week='mon-fri', hour=10, minute=30),
'schedule': crontab(day_of_week='mon-fri', hour=14, minute=30),
'options': {'queue': QueueNames.PERIODIC}
},
}

View File

@@ -259,8 +259,8 @@ def _filter_query(query, filter_dict=None):
# filter by status
statuses = multidict.getlist('status')
if statuses:
statuses = Notification.substitute_status(statuses)
query = query.filter(Notification.status.in_(statuses))
# filter by template
@@ -272,34 +272,27 @@ def _filter_query(query, filter_dict=None):
@autocommit
def insert_notification_history_delete_notifications_by_id(
def sanitize_successful_notification_by_id(
notification_id
):
"""
Deletes one notification after it has run successfully and moves it to the notification_history
table.
# TODO what to do for international?
# phone_prefix = '1'
# Notification.query.filter(
# Notification.id.in_([notification_id]),
# ).update(
# {'to': phone_prefix, 'normalised_to': phone_prefix, 'status': 'delivered'}
# )
# db.session.commit()
update_query = """
update notifications set notification_status='delivered', "to"='1', normalised_to='1'
where id=:notification_id
"""
input_params = {
"notification_id": notification_id
}
# Insert into NotificationHistory if the row already exists do nothing.
insert_query = """
insert into notification_history
SELECT id, job_id, job_row_number, service_id, template_id, template_version, api_key_id,
key_type, notification_type, created_at, sent_at, sent_by, updated_at, reference, billable_units,
client_reference, international, phone_prefix, rate_multiplier, notification_status,
created_by_id, document_download_count
from NOTIFICATIONS WHERE id= :notification_id
ON CONFLICT ON CONSTRAINT notification_history_pkey
DO NOTHING
"""
delete_query = """
DELETE FROM notifications
where id= :notification_id
"""
db.session.execute(insert_query, input_params)
db.session.execute(delete_query, input_params)
db.session.execute(update_query, input_params)
@autocommit

View File

@@ -28,7 +28,6 @@ from app.models import (
EMAIL_TYPE,
KEY_TYPE_TEST,
NOTIFICATION_SENDING,
NOTIFICATION_SENT,
NOTIFICATION_STATUS_TYPES_COMPLETED,
NOTIFICATION_TECHNICAL_FAILURE,
SMS_TYPE,
@@ -137,9 +136,7 @@ def update_notification_to_sending(notification, provider):
notification.sent_at = datetime.utcnow()
notification.sent_by = provider.name
if notification.status not in NOTIFICATION_STATUS_TYPES_COMPLETED:
# We currently have no callback method for SMS deliveries
# TODO create celery task to request SMS delivery receipts from cloudwatch api
notification.status = NOTIFICATION_SENT if notification.notification_type == "sms" else NOTIFICATION_SENDING
notification.status = NOTIFICATION_SENDING
dao_update_notification(notification)

View File

@@ -121,8 +121,6 @@ def persist_notification(
updated_at=updated_at
)
current_app.logger.info('Persisting notification with to address: {}'.format(notification.to))
if notification_type == SMS_TYPE:
formatted_recipient = validate_and_format_phone_number(recipient, international=True)
recipient_info = get_international_phone_info(formatted_recipient)
@@ -133,7 +131,6 @@ def persist_notification(
elif notification_type == EMAIL_TYPE:
current_app.logger.info('Persisting notification with type: {}'.format(EMAIL_TYPE))
notification.normalised_to = format_email_address(notification.to)
current_app.logger.info('Persisting notification to formatted email: {}'.format(notification.normalised_to))
# if simulated create a Notification model to return but do not persist the Notification to the dB
if not simulated:

View File

@@ -395,7 +395,6 @@ def get_all_notifications_for_service(service_id):
notifications = [notification.serialize_for_csv() for notification in pagination.items]
else:
notifications = notification_with_template_schema.dump(pagination.items, many=True)
# We try and get the next page of results to work out if we need provide a pagination link to the next page
# in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
# call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but
@@ -429,7 +428,6 @@ def get_all_notifications_for_service(service_id):
@service_blueprint.route('/<uuid:service_id>/notifications/<uuid:notification_id>', methods=['GET'])
def get_notification_for_service(service_id, notification_id):
notification = notifications_dao.get_notification_with_personalisation(
service_id,
notification_id,