From 3bcaf8330e142d4484831027eed455facd48d8ea Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 16:39:55 +0000 Subject: [PATCH 1/7] Simplify comment for DAO timeout function --- app/dao/notifications_dao.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 2ad80a226..15daca241 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -491,13 +491,7 @@ def dao_delete_notifications_by_id(notification_id): def dao_timeout_notifications(timeout_period_in_seconds): """ - Timeout SMS and email notifications by the following rules: - - the notification was sent to the provider but there was not a delivery receipt - sending -> temporary-failure - pending -> temporary-failure - - Letter notifications are not timed out + Set email and SMS notifications (only) to "temporary-failure" status. """ timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds) updated_at = datetime.utcnow() From b81a66da504e1ce11ed887336cd0b2dc7742e0da Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 16:48:30 +0000 Subject: [PATCH 2/7] Fix assertions in tests for timeout DAO function Previously most of the assertions were being run *before* we had actually called the function. There was also a redundant block of assertions that just asserted the initial state of the test data. --- .../notification_dao/test_notification_dao.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index f81899c21..8e5f495e2 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -671,16 +671,13 @@ def test_dao_timeout_notifications(sample_template): pending = create_notification(sample_template, status='pending') delivered = create_notification(sample_template, status='delivered') - assert Notification.query.get(created.id).status == 'created' - assert Notification.query.get(sending.id).status == 'sending' - assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' temporary_failure_notifications = dao_timeout_notifications(1) + + assert len(temporary_failure_notifications) == 2 assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'temporary-failure' assert Notification.query.get(pending.id).status == 'temporary-failure' assert Notification.query.get(delivered.id).status == 'delivered' - assert len(temporary_failure_notifications) == 2 def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_template): @@ -690,12 +687,13 @@ def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_t pending = create_notification(sample_template, status='pending') delivered = create_notification(sample_template, status='delivered') + temporary_failure_notifications = dao_timeout_notifications(1) + + assert len(temporary_failure_notifications) == 0 assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' assert Notification.query.get(delivered.id).status == 'delivered' - temporary_failure_notifications = dao_timeout_notifications(1) - assert len(temporary_failure_notifications) == 0 def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template): @@ -705,12 +703,13 @@ def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template) pending = create_notification(sample_letter_template, status='pending') delivered = create_notification(sample_letter_template, status='delivered') + temporary_failure_notifications = dao_timeout_notifications(1) + + assert len(temporary_failure_notifications) == 0 assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' assert Notification.query.get(delivered.id).status == 'delivered' - temporary_failure_notifications = dao_timeout_notifications(1) - assert len(temporary_failure_notifications) == 0 def test_should_return_notifications_excluding_jobs_by_default(sample_template, sample_job, sample_api_key): From 76aeab24ce25b9a221dc6b7a5034e6e766467b96 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 16:56:21 +0000 Subject: [PATCH 3/7] Rewrite DAO timeout method to take cutoff_time Previously we specified the period and calculated the cutoff time in the function. Passing it in means we can run the method multiple times and avoid getting "new" notifications to time out in the time it takes to process each batch. --- app/celery/nightly_tasks.py | 7 +++++-- app/dao/notifications_dao.py | 10 +++++----- tests/app/celery/test_nightly_tasks.py | 7 ++----- .../app/dao/notification_dao/test_notification_dao.py | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 92d401f46..54714922f 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -117,9 +117,12 @@ def timeout_notifications(): # so that we can cope with a high volume that need processing. We've changed # dao_timeout_notifications to return up to 100K notifications, so this task # will operate on up to 500K - normally we only get around 20K. + cutoff_time = datetime.utcnow() - timedelta( + seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + ) + for _ in range(0, 5): - notifications = \ - dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + notifications = dao_timeout_notifications(cutoff_time) for notification in notifications: check_and_queue_callback_task(notification) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 15daca241..acfa286e3 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -489,11 +489,11 @@ def dao_delete_notifications_by_id(notification_id): ).delete(synchronize_session='fetch') -def dao_timeout_notifications(timeout_period_in_seconds): +def dao_timeout_notifications(cutoff_time): """ - Set email and SMS notifications (only) to "temporary-failure" status. + Set email and SMS notifications (only) to "temporary-failure" status + if they're still sending from before the specified cutoff_time. """ - timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds) updated_at = datetime.utcnow() current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING] new_status = NOTIFICATION_TEMPORARY_FAILURE @@ -503,13 +503,13 @@ def dao_timeout_notifications(timeout_period_in_seconds): # - we believe this is a OOM error but there are no logs. From # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( - Notification.created_at < timeout_start, + Notification.created_at < cutoff_time, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) ).limit(100000).all() Notification.query.filter( - Notification.created_at < timeout_start, + Notification.created_at < cutoff_time, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), Notification.id.in_([n.id for n in notifications]), diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 7a6167aa2..847a27ce9 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -161,17 +161,14 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif mocked.assert_called_once_with('letter') +@freeze_time("2021-12-13T10:00") def test_timeout_notifications(mocker, sample_notification): mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task') mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications') mock_dao.return_value = [sample_notification] timeout_notifications() - - mock_dao.assert_called_once_with( - current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - ) - + mock_dao.assert_called_once_with(datetime.fromisoformat('2021-12-10T10:00')) mock_update.assert_called_once_with(sample_notification) diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 8e5f495e2..e09bb26d8 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -671,7 +671,7 @@ def test_dao_timeout_notifications(sample_template): pending = create_notification(sample_template, status='pending') delivered = create_notification(sample_template, status='delivered') - temporary_failure_notifications = dao_timeout_notifications(1) + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) assert len(temporary_failure_notifications) == 2 assert Notification.query.get(created.id).status == 'created' @@ -687,7 +687,7 @@ def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_t pending = create_notification(sample_template, status='pending') delivered = create_notification(sample_template, status='delivered') - temporary_failure_notifications = dao_timeout_notifications(1) + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) assert len(temporary_failure_notifications) == 0 assert Notification.query.get(created.id).status == 'created' @@ -703,7 +703,7 @@ def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template) pending = create_notification(sample_letter_template, status='pending') delivered = create_notification(sample_letter_template, status='delivered') - temporary_failure_notifications = dao_timeout_notifications(1) + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) assert len(temporary_failure_notifications) == 0 assert Notification.query.get(created.id).status == 'created' From c8ebb365d4b058bce0265253afe295f8e3b0836c Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 16:59:25 +0000 Subject: [PATCH 4/7] Make limit of DAO timeout function more obvious We're going to iterate how we use the function with a limit, so we shouldn't say it's "temporary" anymore. We don't need to change the default, but having it in the function parameters makes it easier to see the funtion doesn't time out all notifications, just some. --- app/dao/notifications_dao.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index acfa286e3..58d7dff47 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -489,7 +489,7 @@ def dao_delete_notifications_by_id(notification_id): ).delete(synchronize_session='fetch') -def dao_timeout_notifications(cutoff_time): +def dao_timeout_notifications(cutoff_time, limit=100000): """ Set email and SMS notifications (only) to "temporary-failure" status if they're still sending from before the specified cutoff_time. @@ -498,15 +498,11 @@ def dao_timeout_notifications(cutoff_time): current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING] new_status = NOTIFICATION_TEMPORARY_FAILURE - # TEMPORARY: limit the notifications to 100K as otherwise we - # see an issues where the task vanishes after it starts executing - # - we believe this is a OOM error but there are no logs. From - # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( Notification.created_at < cutoff_time, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) - ).limit(100000).all() + ).limit(limit).all() Notification.query.filter( Notification.created_at < cutoff_time, From 2adaaac3aec2a83c5c5e87e97d0aa5324baae373 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 17:00:41 +0000 Subject: [PATCH 5/7] Remove redundant conditions for update query Filtering by ID is enough, noting the other conditions were the same between both queries. --- app/dao/notifications_dao.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 58d7dff47..da75f285a 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -505,9 +505,6 @@ def dao_timeout_notifications(cutoff_time, limit=100000): ).limit(limit).all() Notification.query.filter( - Notification.created_at < cutoff_time, - Notification.status.in_(current_statuses), - Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), Notification.id.in_([n.id for n in notifications]), ).update( {'status': new_status, 'updated_at': updated_at}, From 87cd40d00a8df6e8c0d84f9c93a07c814e63c6c1 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 17:09:22 +0000 Subject: [PATCH 6/7] Scale timeout task to work on arbitrary volumes Previously this was limited to 500K notifications. While we don't expect to reach this limit, it's not impossible e.g. if we had a repeat of the incident where one of our providers stopped sending us status updates. Although that's not great, it's worse if our code can't cope with the unexpectedly high volume. This reuses the technique we have elsewhere [1] to keep processing in batches until there's nothing left. Specifying a cutoff point means the total amount of work to do can't keep growing. [1]: https://github.com/alphagov/notifications-api/blob/2fb432adaf648e3c6a4201ac0a1d4b966501077e/app/dao/notifications_dao.py#L441 --- app/celery/nightly_tasks.py | 13 +++---------- tests/app/celery/test_nightly_tasks.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 54714922f..15f96b4e3 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -113,15 +113,13 @@ def delete_letter_notifications_older_than_retention(): @notify_celery.task(name='timeout-sending-notifications') @cronitor('timeout-sending-notifications') def timeout_notifications(): - # TEMPORARY: re-run the following code over small batches of notifications - # so that we can cope with a high volume that need processing. We've changed - # dao_timeout_notifications to return up to 100K notifications, so this task - # will operate on up to 500K - normally we only get around 20K. + notifications = ['dummy value so len() > 0'] + cutoff_time = datetime.utcnow() - timedelta( seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') ) - for _ in range(0, 5): + while len(notifications) > 0: notifications = dao_timeout_notifications(cutoff_time) for notification in notifications: @@ -130,11 +128,6 @@ def timeout_notifications(): current_app.logger.info( "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) - if len(notifications) < 100000: - return - - raise RuntimeError("Some notifications may still be in sending.") - @notify_celery.task(name="delete-inbound-sms") @cronitor("delete-inbound-sms") diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 847a27ce9..ade428b5f 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -165,11 +165,16 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif def test_timeout_notifications(mocker, sample_notification): mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task') mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications') - mock_dao.return_value = [sample_notification] + + mock_dao.side_effect = [ + [sample_notification], # first batch to time out + [sample_notification], # second batch + [] # nothing left to time out + ] timeout_notifications() - mock_dao.assert_called_once_with(datetime.fromisoformat('2021-12-10T10:00')) - mock_update.assert_called_once_with(sample_notification) + mock_dao.assert_called_with(datetime.fromisoformat('2021-12-10T10:00')) + assert mock_update.mock_calls == [call(sample_notification), call(sample_notification)] def test_delete_inbound_sms_calls_child_task(notify_api, mocker): From c1f0c24d82422e26be93e20ba99eb977465afe54 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 13 Dec 2021 17:17:41 +0000 Subject: [PATCH 7/7] Trim down tests for DAO timeout function a bit The first test is enough to cover that "created" and "delivered" notifications aren't affected by this function. --- tests/app/dao/notification_dao/test_notification_dao.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index e09bb26d8..bafc9861c 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -682,34 +682,26 @@ def test_dao_timeout_notifications(sample_template): def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_template): with freeze_time(datetime.utcnow() + timedelta(minutes=10)): - created = create_notification(sample_template, status='created') sending = create_notification(sample_template, status='sending') pending = create_notification(sample_template, status='pending') - delivered = create_notification(sample_template, status='delivered') temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) assert len(temporary_failure_notifications) == 0 - assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template): with freeze_time(datetime.utcnow() - timedelta(minutes=2)): - created = create_notification(sample_letter_template, status='created') sending = create_notification(sample_letter_template, status='sending') pending = create_notification(sample_letter_template, status='pending') - delivered = create_notification(sample_letter_template, status='delivered') temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) assert len(temporary_failure_notifications) == 0 - assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' def test_should_return_notifications_excluding_jobs_by_default(sample_template, sample_job, sample_api_key):