Merge pull request #3360 from alphagov/test-limit-timeout-notifications

Optimise query to get notifications to "time out"
This commit is contained in:
Ben Thorner
2021-11-09 15:54:04 +00:00
committed by GitHub
2 changed files with 36 additions and 20 deletions

View File

@@ -130,25 +130,35 @@ def delete_letter_notifications_older_than_retention():
@notify_celery.task(name='timeout-sending-notifications')
@cronitor('timeout-sending-notifications')
def timeout_notifications():
technical_failure_notifications, temporary_failure_notifications = \
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
# TEMPORARY: re-run the following code over small batches of notifications
# so that we can cope with a high volume that need processing. We've changed
# dao_timeout_notifications to return up to 100K notifications, so this task
# will operate on up to 500K - normally we only get around 20K.
for _ in range(0, 5):
technical_failure_notifications, temporary_failure_notifications = \
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
notifications = technical_failure_notifications + temporary_failure_notifications
for notification in notifications:
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
if service_callback_api:
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
queue=QueueNames.CALLBACKS)
notifications = technical_failure_notifications + temporary_failure_notifications
for notification in notifications:
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501
if service_callback_api:
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
queue=QueueNames.CALLBACKS)
current_app.logger.info(
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
if technical_failure_notifications:
message = "{} notifications have been updated to technical-failure because they " \
"have timed out and are still in created.Notification ids: {}".format(
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
raise NotificationTechnicalFailureException(message)
current_app.logger.info(
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
if technical_failure_notifications:
message = "{} notifications have been updated to technical-failure because they " \
"have timed out and are still in created.Notification ids: {}".format(
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
raise NotificationTechnicalFailureException(message)
if len(notifications) < 100000:
return
raise RuntimeError("Some notifications may still be in sending.")
@notify_celery.task(name="delete-inbound-sms")

View File

@@ -467,15 +467,21 @@ def dao_delete_notifications_by_id(notification_id):
def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at):
# TEMPORARY: limit the notifications to 100K as otherwise we
# see an issues where the task vanishes after it starts executing
# - we believe this is a OOM error but there are no logs. From
# experimentation we've found we can safely process up to 100K.
notifications = Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type != LETTER_TYPE
).all()
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
).limit(100000).all()
Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type != LETTER_TYPE
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]),
Notification.id.in_([n.id for n in notifications]),
).update(
{'status': new_status, 'updated_at': updated_at},
synchronize_session=False