From b4a68ada2d197f2801728b7299582bdb1e1c6b49 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 22 Nov 2021 12:33:36 +0000 Subject: [PATCH] Remove volume restriction on timeout-notifications This (mostly) reverts [1], which we will replace with something a bit better in the next commits. We keep the optimisation [2] and the change that locks the notifications to update query [3]. We can't really add any tests for the "lock" feature as it's there to defend against a race condition that's hard to reproduce in a test. It's not a critical feature, though. [1]: https://github.com/alphagov/notifications-api/pull/3360/files [2]: https://github.com/alphagov/notifications-api/pull/3360/files#diff-3f7bdffd1a82923678f3855d8b0d7ad03fe4537224d09ca354999484de9e5420R477 [3]: https://github.com/alphagov/notifications-api/pull/3360/files#diff-3f7bdffd1a82923678f3855d8b0d7ad03fe4537224d09ca354999484de9e5420R484 --- app/celery/nightly_tasks.py | 44 ++++++++++++++---------------------- app/dao/notifications_dao.py | 6 +---- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index beb484f89..267839fa2 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -130,35 +130,25 @@ def delete_letter_notifications_older_than_retention(): @notify_celery.task(name='timeout-sending-notifications') @cronitor('timeout-sending-notifications') def timeout_notifications(): - # TEMPORARY: re-run the following code over small batches of notifications - # so that we can cope with a high volume that need processing. We've changed - # dao_timeout_notifications to return up to 100K notifications, so this task - # will operate on up to 500K - normally we only get around 20K. - for _ in range(0, 5): - technical_failure_notifications, temporary_failure_notifications = \ - dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + technical_failure_notifications, temporary_failure_notifications = \ + dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) - notifications = technical_failure_notifications + temporary_failure_notifications - for notification in notifications: - # queue callback task only if the service_callback_api exists - service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501 - if service_callback_api: - encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], - queue=QueueNames.CALLBACKS) + notifications = technical_failure_notifications + temporary_failure_notifications + for notification in notifications: + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501 + if service_callback_api: + encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) + send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], + queue=QueueNames.CALLBACKS) - current_app.logger.info( - "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) - if technical_failure_notifications: - message = "{} notifications have been updated to technical-failure because they " \ - "have timed out and are still in created.Notification ids: {}".format( - len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) - raise NotificationTechnicalFailureException(message) - - if len(notifications) < 100000: - return - - raise RuntimeError("Some notifications may still be in sending.") + current_app.logger.info( + "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) + if technical_failure_notifications: + message = "{} notifications have been updated to technical-failure because they " \ + "have timed out and are still in created.Notification ids: {}".format( + len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) + raise NotificationTechnicalFailureException(message) @notify_celery.task(name="delete-inbound-sms") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index b7047c989..0c93752dd 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -467,15 +467,11 @@ def dao_delete_notifications_by_id(notification_id): def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at): - # TEMPORARY: limit the notifications to 100K as otherwise we - # see an issues where the task vanishes after it starts executing - # - we believe this is a OOM error but there are no logs. From - # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( Notification.created_at < timeout_start, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) - ).limit(100000).all() + ).all() Notification.query.filter( Notification.created_at < timeout_start,