diff --git a/app/celery/nightly_tasks.py b/app/celery/nightly_tasks.py index 92d401f46..15f96b4e3 100644 --- a/app/celery/nightly_tasks.py +++ b/app/celery/nightly_tasks.py @@ -113,13 +113,14 @@ def delete_letter_notifications_older_than_retention(): @notify_celery.task(name='timeout-sending-notifications') @cronitor('timeout-sending-notifications') def timeout_notifications(): - # TEMPORARY: re-run the following code over small batches of notifications - # so that we can cope with a high volume that need processing. We've changed - # dao_timeout_notifications to return up to 100K notifications, so this task - # will operate on up to 500K - normally we only get around 20K. - for _ in range(0, 5): - notifications = \ - dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + notifications = ['dummy value so len() > 0'] + + cutoff_time = datetime.utcnow() - timedelta( + seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') + ) + + while len(notifications) > 0: + notifications = dao_timeout_notifications(cutoff_time) for notification in notifications: check_and_queue_callback_task(notification) @@ -127,11 +128,6 @@ def timeout_notifications(): current_app.logger.info( "Timeout period reached for {} notifications, status has been updated.".format(len(notifications))) - if len(notifications) < 100000: - return - - raise RuntimeError("Some notifications may still be in sending.") - @notify_celery.task(name="delete-inbound-sms") @cronitor("delete-inbound-sms") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 2ad80a226..da75f285a 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -489,35 +489,22 @@ def dao_delete_notifications_by_id(notification_id): ).delete(synchronize_session='fetch') -def dao_timeout_notifications(timeout_period_in_seconds): +def dao_timeout_notifications(cutoff_time, limit=100000): """ - Timeout SMS and email notifications by the following rules: - - the notification was sent to the provider but there was not a delivery receipt - sending -> temporary-failure - pending -> temporary-failure - - Letter notifications are not timed out + Set email and SMS notifications (only) to "temporary-failure" status + if they're still sending from before the specified cutoff_time. """ - timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds) updated_at = datetime.utcnow() current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING] new_status = NOTIFICATION_TEMPORARY_FAILURE - # TEMPORARY: limit the notifications to 100K as otherwise we - # see an issues where the task vanishes after it starts executing - # - we believe this is a OOM error but there are no logs. From - # experimentation we've found we can safely process up to 100K. notifications = Notification.query.filter( - Notification.created_at < timeout_start, + Notification.created_at < cutoff_time, Notification.status.in_(current_statuses), Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) - ).limit(100000).all() + ).limit(limit).all() Notification.query.filter( - Notification.created_at < timeout_start, - Notification.status.in_(current_statuses), - Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), Notification.id.in_([n.id for n in notifications]), ).update( {'status': new_status, 'updated_at': updated_at}, diff --git a/tests/app/celery/test_nightly_tasks.py b/tests/app/celery/test_nightly_tasks.py index 7a6167aa2..ade428b5f 100644 --- a/tests/app/celery/test_nightly_tasks.py +++ b/tests/app/celery/test_nightly_tasks.py @@ -161,18 +161,20 @@ def test_delete_letter_notifications_older_than_retention_calls_child_task(notif mocked.assert_called_once_with('letter') +@freeze_time("2021-12-13T10:00") def test_timeout_notifications(mocker, sample_notification): mock_update = mocker.patch('app.celery.nightly_tasks.check_and_queue_callback_task') mock_dao = mocker.patch('app.celery.nightly_tasks.dao_timeout_notifications') - mock_dao.return_value = [sample_notification] + + mock_dao.side_effect = [ + [sample_notification], # first batch to time out + [sample_notification], # second batch + [] # nothing left to time out + ] timeout_notifications() - - mock_dao.assert_called_once_with( - current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - ) - - mock_update.assert_called_once_with(sample_notification) + mock_dao.assert_called_with(datetime.fromisoformat('2021-12-10T10:00')) + assert mock_update.mock_calls == [call(sample_notification), call(sample_notification)] def test_delete_inbound_sms_calls_child_task(notify_api, mocker): diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index f81899c21..bafc9861c 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -671,46 +671,37 @@ def test_dao_timeout_notifications(sample_template): pending = create_notification(sample_template, status='pending') delivered = create_notification(sample_template, status='delivered') - assert Notification.query.get(created.id).status == 'created' - assert Notification.query.get(sending.id).status == 'sending' - assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' - temporary_failure_notifications = dao_timeout_notifications(1) + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) + + assert len(temporary_failure_notifications) == 2 assert Notification.query.get(created.id).status == 'created' assert Notification.query.get(sending.id).status == 'temporary-failure' assert Notification.query.get(pending.id).status == 'temporary-failure' assert Notification.query.get(delivered.id).status == 'delivered' - assert len(temporary_failure_notifications) == 2 def test_dao_timeout_notifications_only_updates_for_older_notifications(sample_template): with freeze_time(datetime.utcnow() + timedelta(minutes=10)): - created = create_notification(sample_template, status='created') sending = create_notification(sample_template, status='sending') pending = create_notification(sample_template, status='pending') - delivered = create_notification(sample_template, status='delivered') - assert Notification.query.get(created.id).status == 'created' + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) + + assert len(temporary_failure_notifications) == 0 assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' - temporary_failure_notifications = dao_timeout_notifications(1) - assert len(temporary_failure_notifications) == 0 def test_dao_timeout_notifications_doesnt_affect_letters(sample_letter_template): with freeze_time(datetime.utcnow() - timedelta(minutes=2)): - created = create_notification(sample_letter_template, status='created') sending = create_notification(sample_letter_template, status='sending') pending = create_notification(sample_letter_template, status='pending') - delivered = create_notification(sample_letter_template, status='delivered') - assert Notification.query.get(created.id).status == 'created' + temporary_failure_notifications = dao_timeout_notifications(datetime.utcnow()) + + assert len(temporary_failure_notifications) == 0 assert Notification.query.get(sending.id).status == 'sending' assert Notification.query.get(pending.id).status == 'pending' - assert Notification.query.get(delivered.id).status == 'delivered' - temporary_failure_notifications = dao_timeout_notifications(1) - assert len(temporary_failure_notifications) == 0 def test_should_return_notifications_excluding_jobs_by_default(sample_template, sample_job, sample_api_key):