diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index ef6b6f72d..c6ef22c71 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -130,25 +130,35 @@ def delete_letter_notifications_older_than_retention(): @notify_celery.task(name='timeout-sending-notifications') @cronitor('timeout-sending-notifications') def timeout_notifications(): - technical_failure_notifications, temporary_failure_notifications = \ - dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + # TEMPORARY: re-run the following code over small batches of notifications + # so that we can cope with a high volume that need processing. We've changed + # dao_timeout_notifications to return up to 100K notifications, so this task + # will operate on up to 500K - normally we only get around 20K. + for _ in range(0, 5): + technical_failure_notifications, temporary_failure_notifications = \ + dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) - notifications = technical_failure_notifications + temporary_failure_notifications - for notification in notifications: - # queue callback task only if the service_callback_api exists - service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) - if service_callback_api: - encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) - send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], - queue=QueueNames.CALLBACKS) + notifications = technical_failure_notifications + temporary_failure_notifications + for notification in notifications: + # queue callback task only if the service_callback_api exists + service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501 + if service_callback_api: + encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api) + send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification], + queue=QueueNames.CALLBACKS) - current_app.logger.info( - "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) - if technical_failure_notifications: - message = "{} notifications have been updated to technical-failure because they " \ - "have timed out and are still in created.Notification ids: {}".format( - len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) - raise NotificationTechnicalFailureException(message) + current_app.logger.info( + "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) + if technical_failure_notifications: + message = "{} notifications have been updated to technical-failure because they " \ + "have timed out and are still in created.Notification ids: {}".format( + len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications]) + raise NotificationTechnicalFailureException(message) + + if len(notifications) == 0: + return + + raise RuntimeError("Some notifications may still be in sending.") @notify_celery.task(name="delete-inbound-sms") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 559d1aabd..325cd076a 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -467,15 +467,21 @@ def dao_delete_notifications_by_id(notification_id): def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at): + # TEMPORARY: limit the notifications to 100K as otherwise we + # see an issues where the task vanishes after it starts executing + # - we believe this is a OOM error but there are no logs. From + # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( Notification.created_at < timeout_start, Notification.status.in_(current_statuses), - Notification.notification_type != LETTER_TYPE - ).all() + Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) + ).limit(100000).all() + Notification.query.filter( Notification.created_at < timeout_start, Notification.status.in_(current_statuses), - Notification.notification_type != LETTER_TYPE + Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), + Notification.id.in_([n.id for n in notifications]), ).update( {'status': new_status, 'updated_at': updated_at}, synchronize_session=False