diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index f5700ea68..48a45b597 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -1,4 +1,3 @@ -import functools from datetime import datetime, timedelta from itertools import groupby from operator import attrgetter @@ -40,7 +39,6 @@ from app.models import ( NOTIFICATION_SENDING, NOTIFICATION_SENT, NOTIFICATION_STATUS_TYPES_COMPLETED, - NOTIFICATION_TECHNICAL_FAILURE, NOTIFICATION_TEMPORARY_FAILURE, SMS_TYPE, FactNotificationStatus, @@ -489,29 +487,6 @@ def dao_delete_notifications_by_id(notification_id): ).delete(synchronize_session='fetch') -def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at): - # TEMPORARY: limit the notifications to 100K as otherwise we - # see an issues where the task vanishes after it starts executing - # - we believe this is a OOM error but there are no logs. From - # experimentation we've found we can safely process up to 100K. - notifications = Notification.query.filter( - Notification.created_at < timeout_start, - Notification.status.in_(current_statuses), - Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) - ).limit(100000).all() - - Notification.query.filter( - Notification.created_at < timeout_start, - Notification.status.in_(current_statuses), - Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), - Notification.id.in_([n.id for n in notifications]), - ).update( - {'status': new_status, 'updated_at': updated_at}, - synchronize_session=False - ) - return notifications - - def dao_check_notifications_still_in_created(minimum_age_in_seconds): min_created_at = datetime.utcnow() - timedelta(seconds=minimum_age_in_seconds) @@ -534,15 +509,31 @@ def dao_timeout_notifications(timeout_period_in_seconds): """ timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds) updated_at = datetime.utcnow() - timeout = functools.partial(_timeout_notifications, timeout_start=timeout_start, updated_at=updated_at) + current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING] + new_status = NOTIFICATION_TEMPORARY_FAILURE - # Notifications still in sending or pending status are marked with a temporary-failure: - temporary_failure_notifications = timeout([NOTIFICATION_SENDING, NOTIFICATION_PENDING], - NOTIFICATION_TEMPORARY_FAILURE) + # TEMPORARY: limit the notifications to 100K as otherwise we + # see an issues where the task vanishes after it starts executing + # - we believe this is a OOM error but there are no logs. From + # experimentation we've found we can safely process up to 100K. + notifications = Notification.query.filter( + Notification.created_at < timeout_start, + Notification.status.in_(current_statuses), + Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]) + ).limit(100000).all() + + Notification.query.filter( + Notification.created_at < timeout_start, + Notification.status.in_(current_statuses), + Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]), + Notification.id.in_([n.id for n in notifications]), + ).update( + {'status': new_status, 'updated_at': updated_at}, + synchronize_session=False + ) db.session.commit() - - return temporary_failure_notifications + return notifications def is_delivery_slow_for_providers(