mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-31 15:15:38 -05:00
Scale timeout task to work on arbitrary volumes
Previously this was limited to 500K notifications. While we don't
expect to reach this limit, it's not impossible e.g. if we had a
repeat of the incident where one of our providers stopped sending
us status updates. Although that's not great, it's worse if our
code can't cope with the unexpectedly high volume.
This reuses the technique we have elsewhere [1] to keep processing
in batches until there's nothing left. Specifying a cutoff point
means the total amount of work to do can't keep growing.
[1]: 2fb432adaf/app/dao/notifications_dao.py (L441)
This commit is contained in:
@@ -113,15 +113,13 @@ def delete_letter_notifications_older_than_retention():
|
|||||||
@notify_celery.task(name='timeout-sending-notifications')
|
@notify_celery.task(name='timeout-sending-notifications')
|
||||||
@cronitor('timeout-sending-notifications')
|
@cronitor('timeout-sending-notifications')
|
||||||
def timeout_notifications():
|
def timeout_notifications():
|
||||||
# TEMPORARY: re-run the following code over small batches of notifications
|
notifications = ['dummy value so len() > 0']
|
||||||
# so that we can cope with a high volume that need processing. We've changed
|
|
||||||
# dao_timeout_notifications to return up to 100K notifications, so this task
|
|
||||||
# will operate on up to 500K - normally we only get around 20K.
|
|
||||||
cutoff_time = datetime.utcnow() - timedelta(
|
cutoff_time = datetime.utcnow() - timedelta(
|
||||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||||
)
|
)
|
||||||
|
|
||||||
for _ in range(0, 5):
|
while len(notifications) > 0:
|
||||||
notifications = dao_timeout_notifications(cutoff_time)
|
notifications = dao_timeout_notifications(cutoff_time)
|
||||||
|
|
||||||
for notification in notifications:
|
for notification in notifications:
|
||||||
@@ -130,11 +128,6 @@ def timeout_notifications():
|
|||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
||||||
|
|
||||||
if len(notifications) < 100000:
|
|
||||||
return
|
|
||||||
|
|
||||||
raise RuntimeError("Some notifications may still be in sending.")
|
|
||||||
|
|
||||||
|
|
||||||
@notify_celery.task(name="delete-inbound-sms")
|
@notify_celery.task(name="delete-inbound-sms")
|
||||||
@cronitor("delete-inbound-sms")
|
@cronitor("delete-inbound-sms")
|
||||||
|
|||||||
@@ -165,11 +165,16 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif
|
|||||||
def test_timeout_notifications(mocker, sample_notification):
|
def test_timeout_notifications(mocker, sample_notification):
|
||||||
mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task')
|
mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task')
|
||||||
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
||||||
mock_dao.return_value = [sample_notification]
|
|
||||||
|
mock_dao.side_effect = [
|
||||||
|
[sample_notification], # first batch to time out
|
||||||
|
[sample_notification], # second batch
|
||||||
|
[] # nothing left to time out
|
||||||
|
]
|
||||||
|
|
||||||
timeout_notifications()
|
timeout_notifications()
|
||||||
mock_dao.assert_called_once_with(datetime.fromisoformat('2021-12-10T10:00'))
|
mock_dao.assert_called_with(datetime.fromisoformat('2021-12-10T10:00'))
|
||||||
mock_update.assert_called_once_with(sample_notification)
|
assert mock_update.mock_calls == [call(sample_notification), call(sample_notification)]
|
||||||
|
|
||||||
|
|
||||||
def test_delete_inbound_sms_calls_child_task(notify_api, mocker):
|
def test_delete_inbound_sms_calls_child_task(notify_api, mocker):
|
||||||
|
|||||||
Reference in New Issue
Block a user