diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index beb484f89..267839fa2 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -130,35 +130,25 @@ def delete_letter_notifications_older_than_retention(): @notify_celery.task(name='timeout-sending-notifications') @cronitor('timeout-sending-notifications') def timeout_notifications(): - # TEMPORARY: re-run the following code over small batches of notifications - # so that we can cope with a high volume that need processing. We've changed - # dao_timeout_notifications to return up to 100K notifications, so this task - # will operate on up to 500K - normally we only get around 20K. - for _ in range(0, 5): - technical_failure_notifications, temporary_failure_notifications = \ - dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + technical_failure_notifications, temporary_failure_notifications = \ + dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) - notifications = technical_failure_notifications + temporary_failure_notifications - for notification in notifications: - # queue callback task only if the service_callback_api exists - service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501 - if service_callback_api: - encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], - queue=QueueNames.CALLBACKS) + notifications = technical_failure_notifications + temporary_failure_notifications + for notification in notifications: + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501 + if service_callback_api: + encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) + send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], + queue=QueueNames.CALLBACKS) - current_app.logger.info( - "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) - if technical_failure_notifications: - message = "{} notifications have been updated to technical-failure because they " \ - "have timed out and are still in created.Notification ids: {}".format( - len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) - raise NotificationTechnicalFailureException(message) - - if len(notifications) < 100000: - return - - raise RuntimeError("Some notifications may still be in sending.") + current_app.logger.info( + "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) + if technical_failure_notifications: + message = "{} notifications have been updated to technical-failure because they " \ + "have timed out and are still in created.Notification ids: {}".format( + len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) + raise NotificationTechnicalFailureException(message) @notify_celery.task(name="delete-inbound-sms") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index b7047c989..0c93752dd 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -467,15 +467,11 @@ def dao_delete_notifications_by_id(notification_id): def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at): - # TEMPORARY: limit the notifications to 100K as otherwise we - # see an issues where the task vanishes after it starts executing - # - we believe this is a OOM error but there are no logs. From - # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( Notification.created_at < timeout_start, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) - ).limit(100000).all() + ).all() Notification.query.filter( Notification.created_at < timeout_start,