Remove volume restriction on timeout-notifications

This (mostly) reverts [1], which we will replace with something a
bit better in the next commits. We keep the optimisation [2] and
the change that locks the notifications to update query [3].

We can't really add any tests for the "lock" feature as it's there
to defend against a race condition that's hard to reproduce in a
test. It's not a critical feature, though.

[1]: https://github.com/alphagov/notifications-api/pull/3360/files
[2]: https://github.com/alphagov/notifications-api/pull/3360/files#diff-3f7bdffd1a82923678f3855d8b0d7ad03fe4537224d09ca354999484de9e5420R477
[3]: https://github.com/alphagov/notifications-api/pull/3360/files#diff-3f7bdffd1a82923678f3855d8b0d7ad03fe4537224d09ca354999484de9e5420R484
This commit is contained in:
Ben Thorner
2021-11-22 12:33:36 +00:00
parent 2b6a550cdc
commit b4a68ada2d
2 changed files with 18 additions and 32 deletions

View File

@@ -130,35 +130,25 @@ def delete_letter_notifications_older_than_retention():
@notify_celery.task(name='timeout-sending-notifications')
@cronitor('timeout-sending-notifications')
def timeout_notifications():
# TEMPORARY: re-run the following code over small batches of notifications
# so that we can cope with a high volume that need processing. We've changed
# dao_timeout_notifications to return up to 100K notifications, so this task
# will operate on up to 500K - normally we only get around 20K.
for _ in range(0, 5):
technical_failure_notifications, temporary_failure_notifications = \
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
technical_failure_notifications, temporary_failure_notifications = \
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
notifications = technical_failure_notifications + temporary_failure_notifications
for notification in notifications:
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501
if service_callback_api:
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
queue=QueueNames.CALLBACKS)
notifications = technical_failure_notifications + temporary_failure_notifications
for notification in notifications:
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501
if service_callback_api:
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
queue=QueueNames.CALLBACKS)
current_app.logger.info(
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
if technical_failure_notifications:
message = "{} notifications have been updated to technical-failure because they " \
"have timed out and are still in created.Notification ids: {}".format(
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
raise NotificationTechnicalFailureException(message)
if len(notifications) < 100000:
return
raise RuntimeError("Some notifications may still be in sending.")
current_app.logger.info(
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
if technical_failure_notifications:
message = "{} notifications have been updated to technical-failure because they " \
"have timed out and are still in created.Notification ids: {}".format(
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
raise NotificationTechnicalFailureException(message)
@notify_celery.task(name="delete-inbound-sms")

View File

@@ -467,15 +467,11 @@ def dao_delete_notifications_by_id(notification_id):
def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at):
# TEMPORARY: limit the notifications to 100K as otherwise we
# see an issues where the task vanishes after it starts executing
# - we believe this is a OOM error but there are no logs. From
# experimentation we've found we can safely process up to 100K.
notifications = Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
).limit(100000).all()
).all()
Notification.query.filter(
Notification.created_at < timeout_start,