Remove unnecessary _timeout partial function

It's no longer necessary to have a separate function that's now
only called once. While sometimes the separation can bring clarity,
here I think it's clearer to have all the code in one place, and
avoid the functools complexity we had before.
This commit is contained in:
Ben Thorner
2021-11-25 17:52:16 +00:00
parent 0318229216
commit 97b58ed4c3

View File

@@ -1,4 +1,3 @@
import functools
from datetime import datetime, timedelta
from itertools import groupby
from operator import attrgetter
@@ -40,7 +39,6 @@ from app.models import (
NOTIFICATION_SENDING,
NOTIFICATION_SENT,
NOTIFICATION_STATUS_TYPES_COMPLETED,
NOTIFICATION_TECHNICAL_FAILURE,
NOTIFICATION_TEMPORARY_FAILURE,
SMS_TYPE,
FactNotificationStatus,
@@ -489,29 +487,6 @@ def dao_delete_notifications_by_id(notification_id):
).delete(synchronize_session='fetch')
def _timeout_notifications(current_statuses, new_status, timeout_start, updated_at):
# TEMPORARY: limit the notifications to 100K as otherwise we
# see an issues where the task vanishes after it starts executing
# - we believe this is a OOM error but there are no logs. From
# experimentation we've found we can safely process up to 100K.
notifications = Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
).limit(100000).all()
Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]),
Notification.id.in_([n.id for n in notifications]),
).update(
{'status': new_status, 'updated_at': updated_at},
synchronize_session=False
)
return notifications
def dao_check_notifications_still_in_created(minimum_age_in_seconds):
min_created_at = datetime.utcnow() - timedelta(seconds=minimum_age_in_seconds)
@@ -534,15 +509,31 @@ def dao_timeout_notifications(timeout_period_in_seconds):
"""
timeout_start = datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds)
updated_at = datetime.utcnow()
timeout = functools.partial(_timeout_notifications, timeout_start=timeout_start, updated_at=updated_at)
current_statuses = [NOTIFICATION_SENDING, NOTIFICATION_PENDING]
new_status = NOTIFICATION_TEMPORARY_FAILURE
# Notifications still in sending or pending status are marked with a temporary-failure:
temporary_failure_notifications = timeout([NOTIFICATION_SENDING, NOTIFICATION_PENDING],
NOTIFICATION_TEMPORARY_FAILURE)
# TEMPORARY: limit the notifications to 100K as otherwise we
# see an issues where the task vanishes after it starts executing
# - we believe this is a OOM error but there are no logs. From
# experimentation we've found we can safely process up to 100K.
notifications = Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE])
).limit(100000).all()
Notification.query.filter(
Notification.created_at < timeout_start,
Notification.status.in_(current_statuses),
Notification.notification_type.in_([SMS_TYPE, EMAIL_TYPE]),
Notification.id.in_([n.id for n in notifications]),
).update(
{'status': new_status, 'updated_at': updated_at},
synchronize_session=False
)
db.session.commit()
return temporary_failure_notifications
return notifications
def is_delivery_slow_for_providers(