Optimise query to get notifications to "time out"

From experimenting in production we found a "!=" caused the engine
to use a sequential scan, whereas explicitly listing all the types
ensured an index scan was used.

We also found that querying for many (over 100K) items leads to
the task stalling - no logs, but no evidence of it running either -
so we also add a limit to the query.

Since the query now only returns a subset of notifications, we need
to ensure the subsequent "update" query operates on the same batch.
Also, as a temporary measure, we have a loop in the task code to
ensure it operates on the total set of notifications to "time out",
which we assume is less than 500K for the time being.
This commit is contained in:
Ben Thorner
2021-11-08 14:18:21 +00:00
parent 98b6c1d67d
commit 77c8c0a501
2 changed files with 36 additions and 20 deletions

View File

@@ -130,13 +130,18 @@ def delete_letter_notifications_older_than_retention():
@notify_celery.task(name='timeout-sending-notifications')
@cronitor('timeout-sending-notifications')
def timeout_notifications():
# TEMPORARY: re-run the following code over small batches of notifications
# so that we can cope with a high volume that need processing. We've changed
# dao_timeout_notifications to return up to 100K notifications, so this task
# will operate on up to 500K - normally we only get around 20K.
for _ in range(0, 5):
technical_failure_notifications, temporary_failure_notifications = \
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
notifications = technical_failure_notifications + temporary_failure_notifications
for notification in notifications:
# queue callback task only if the service_callback_api exists
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id)
service_callback_api = get_service_delivery_status_callback_api_for_service(service_id=notification.service_id) # noqa: E501
if service_callback_api:
encrypted_notification = create_delivery_status_callback_data(notification, service_callback_api)
send_delivery_status_to_service.apply_async([str(notification.id), encrypted_notification],
@@ -150,6 +155,11 @@ def timeout_notifications():
len(technical_failure_notifications), [str(x.id) for x in technical_failure_notifications])
raise NotificationTechnicalFailureException(message)
if len(notifications) == 0:
return
raise RuntimeError("Some notifications may still be in sending.")
@notify_celery.task(name="delete-inbound-sms")
@cronitor("delete-inbound-sms")

View File

@@ -467,15 +467,21 @@ def dao_delete_notifications_by_id(notification_id):
def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at):
# TEMPORARY: limit the notifications to 100K as otherwise we
# see an issues where the task vanishes after it starts executing
# - we believe this is a OOM error but there are no logs. From
# experimentation we've found we can safely process up to 100K.
notifications = Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type != LETTER_TYPE
).all()
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
).limit(100000).all()
Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type != LETTER_TYPE
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]),
Notification.id.in_([n.id for n in notifications]),
).update(
{'status': new_status, 'updated_at': updated_at},
synchronize_session=False