mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-20 23:41:17 -05:00
Merge pull request #3398 from alphagov/infinity-timeout-180344153
Scale timeout task to work on arbitrary volumes
This commit is contained in:
@@ -113,13 +113,14 @@ def delete_letter_notifications_older_than_retention():
|
||||
@notify_celery.task(name='timeout-sending-notifications')
|
||||
@cronitor('timeout-sending-notifications')
|
||||
def timeout_notifications():
|
||||
# TEMPORARY: re-run the following code over small batches of notifications
|
||||
# so that we can cope with a high volume that need processing. We've changed
|
||||
# dao_timeout_notifications to return up to 100K notifications, so this task
|
||||
# will operate on up to 500K - normally we only get around 20K.
|
||||
for _ in range(0, 5):
|
||||
notifications = \
|
||||
dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
||||
notifications = ['dummy value so len() > 0']
|
||||
|
||||
cutoff_time = datetime.utcnow() - timedelta(
|
||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||
)
|
||||
|
||||
while len(notifications) > 0:
|
||||
notifications = dao_timeout_notifications(cutoff_time)
|
||||
|
||||
for notification in notifications:
|
||||
check_and_queue_callback_task(notification)
|
||||
@@ -127,11 +128,6 @@ def timeout_notifications():
|
||||
current_app.logger.info(
|
||||
"Timeout period reached for {} notifications, status has been updated.".format(len(notifications)))
|
||||
|
||||
if len(notifications) < 100000:
|
||||
return
|
||||
|
||||
raise RuntimeError("Some notifications may still be in sending.")
|
||||
|
||||
|
||||
@notify_celery.task(name="delete-inbound-sms")
|
||||
@cronitor("delete-inbound-sms")
|
||||
|
||||
@@ -489,35 +489,22 @@ def dao_delete_notifications_by_id(notification_id):
|
||||
).delete(synchronize_session='fetch')
|
||||
|
||||
|
||||
def dao_timeout_notifications(timeout_period_in_seconds):
|
||||
def dao_timeout_notifications(cutoff_time, limit=100000):
|
||||
"""
|
||||
Timeout SMS and email notifications by the following rules:
|
||||
|
||||
the notification was sent to the provider but there was not a delivery receipt
|
||||
sending -> temporary-failure
|
||||
pending -> temporary-failure
|
||||
|
||||
Letter notifications are not timed out
|
||||
Set email and SMS notifications (only) to "temporary-failure" status
|
||||
if they're still sending from before the specified cutoff_time.
|
||||
"""
|
||||
timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds)
|
||||
updated_at = datetime.utcnow()
|
||||
current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING]
|
||||
new_status = NOTIFICATION_TEMPORARY_FAILURE
|
||||
|
||||
# TEMPORARY: limit the notifications to 100K as otherwise we
|
||||
# see an issues where the task vanishes after it starts executing
|
||||
# - we believe this is a OOM error but there are no logs. From
|
||||
# experimentation we've found we can safely process up to 100K.
|
||||
notifications = Notification.query.filter(
|
||||
Notification.created_at < timeout_start,
|
||||
Notification.created_at < cutoff_time,
|
||||
Notification.status.in_(current_statuses),
|
||||
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
|
||||
).limit(100000).all()
|
||||
).limit(limit).all()
|
||||
|
||||
Notification.query.filter(
|
||||
Notification.created_at < timeout_start,
|
||||
Notification.status.in_(current_statuses),
|
||||
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]),
|
||||
Notification.id.in_([n.id for n in notifications]),
|
||||
).update(
|
||||
{'status': new_status, 'updated_at': updated_at},
|
||||
|
||||
@@ -161,18 +161,20 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif
|
||||
mocked.assert_called_once_with('letter')
|
||||
|
||||
|
||||
@freeze_time("2021-12-13T10:00")
|
||||
def test_timeout_notifications(mocker, sample_notification):
|
||||
mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task')
|
||||
mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications')
|
||||
mock_dao.return_value = [sample_notification]
|
||||
|
||||
mock_dao.side_effect = [
|
||||
[sample_notification], # first batch to time out
|
||||
[sample_notification], # second batch
|
||||
[] # nothing left to time out
|
||||
]
|
||||
|
||||
timeout_notifications()
|
||||
|
||||
mock_dao.assert_called_once_with(
|
||||
current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
||||
)
|
||||
|
||||
mock_update.assert_called_once_with(sample_notification)
|
||||
mock_dao.assert_called_with(datetime.fromisoformat('2021-12-10T10:00'))
|
||||
assert mock_update.mock_calls == [call(sample_notification), call(sample_notification)]
|
||||
|
||||
|
||||
def test_delete_inbound_sms_calls_child_task(notify_api, mocker):
|
||||
|
||||
@@ -671,46 +671,37 @@ def test_dao_timeout_notifications(sample_template):
|
||||
pending = create_notification(sample_template, status='pending')
|
||||
delivered = create_notification(sample_template, status='delivered')
|
||||
|
||||
assert Notification.query.get(created.id).status == 'created'
|
||||
assert Notification.query.get(sending.id).status == 'sending'
|
||||
assert Notification.query.get(pending.id).status == 'pending'
|
||||
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||
temporary_failure_notifications = dao_timeout_notifications(1)
|
||||
temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow())
|
||||
|
||||
assert len(temporary_failure_notifications) == 2
|
||||
assert Notification.query.get(created.id).status == 'created'
|
||||
assert Notification.query.get(sending.id).status == 'temporary-failure'
|
||||
assert Notification.query.get(pending.id).status == 'temporary-failure'
|
||||
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||
assert len(temporary_failure_notifications) == 2
|
||||
|
||||
|
||||
def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_template):
|
||||
with freeze_time(datetime.utcnow() + timedelta(minutes=10)):
|
||||
created = create_notification(sample_template, status='created')
|
||||
sending = create_notification(sample_template, status='sending')
|
||||
pending = create_notification(sample_template, status='pending')
|
||||
delivered = create_notification(sample_template, status='delivered')
|
||||
|
||||
assert Notification.query.get(created.id).status == 'created'
|
||||
temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow())
|
||||
|
||||
assert len(temporary_failure_notifications) == 0
|
||||
assert Notification.query.get(sending.id).status == 'sending'
|
||||
assert Notification.query.get(pending.id).status == 'pending'
|
||||
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||
temporary_failure_notifications = dao_timeout_notifications(1)
|
||||
assert len(temporary_failure_notifications) == 0
|
||||
|
||||
|
||||
def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template):
|
||||
with freeze_time(datetime.utcnow() - timedelta(minutes=2)):
|
||||
created = create_notification(sample_letter_template, status='created')
|
||||
sending = create_notification(sample_letter_template, status='sending')
|
||||
pending = create_notification(sample_letter_template, status='pending')
|
||||
delivered = create_notification(sample_letter_template, status='delivered')
|
||||
|
||||
assert Notification.query.get(created.id).status == 'created'
|
||||
temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow())
|
||||
|
||||
assert len(temporary_failure_notifications) == 0
|
||||
assert Notification.query.get(sending.id).status == 'sending'
|
||||
assert Notification.query.get(pending.id).status == 'pending'
|
||||
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||
temporary_failure_notifications = dao_timeout_notifications(1)
|
||||
assert len(temporary_failure_notifications) == 0
|
||||
|
||||
|
||||
def test_should_return_notifications_excluding_jobs_by_default(sample_template, sample_job, sample_api_key):
|
||||
|
||||
Reference in New Issue
Block a user