mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 01:41:05 -05:00
Update the timeout_notifications scheduled tasks.
We found that if the notifications were in created or pending they are not purged from notifications. - New bulk update method to set all notificaitons with: - a status = created|sending|pending to temporary-failure - and is older then today minus SENDING_NOTIFICATIONS_TIMEOUT_PERIOD (in seconds) - the scheduled task to timeout notifications use the new bulk update query. - the task will be more efficient
This commit is contained in:
@@ -7,8 +7,8 @@ from app.aws import s3
|
|||||||
from app import notify_celery
|
from app import notify_celery
|
||||||
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago
|
||||||
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than
|
from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than
|
||||||
from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \
|
from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago,
|
||||||
update_notification_status_by_id
|
dao_timeout_notifications)
|
||||||
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
|
||||||
from app.statsd_decorators import statsd
|
from app.statsd_decorators import statsd
|
||||||
from app.models import JOB_STATUS_PENDING
|
from app.models import JOB_STATUS_PENDING
|
||||||
@@ -111,21 +111,13 @@ def delete_invitations():
|
|||||||
@notify_celery.task(name='timeout-sending-notifications')
|
@notify_celery.task(name='timeout-sending-notifications')
|
||||||
@statsd(namespace="tasks")
|
@statsd(namespace="tasks")
|
||||||
def timeout_notifications():
|
def timeout_notifications():
|
||||||
# TODO: optimize the query by adding the date where clause to this query.
|
try:
|
||||||
notifications = get_notifications(filter_dict={'status': 'sending'})
|
updated = dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD'))
|
||||||
now = datetime.utcnow()
|
if updated:
|
||||||
for noti in notifications:
|
current_app.logger.info(
|
||||||
try:
|
"Timeout period reached for {} notifications, status has been updated.".format(updated))
|
||||||
if (now - noti.created_at) > timedelta(
|
except Exception as e:
|
||||||
seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')
|
current_app.logger.exception(e)
|
||||||
):
|
current_app.logger.error(
|
||||||
# TODO: think about making this a bulk update rather than one at a time.
|
"Exception raised trying to timeout notification skipping notification update."
|
||||||
updated = update_notification_status_by_id(noti.id, 'temporary-failure')
|
)
|
||||||
if updated:
|
|
||||||
current_app.logger.info(
|
|
||||||
"Timeout period reached for notification ({}), status has been updated.".format(noti.id))
|
|
||||||
except Exception as e:
|
|
||||||
current_app.logger.exception(e)
|
|
||||||
current_app.logger.error(
|
|
||||||
"Exception raised trying to timeout notification ({}) skipping notification update.".format(noti.id)
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -17,7 +17,11 @@ from app.models import (
|
|||||||
Notification,
|
Notification,
|
||||||
NotificationHistory,
|
NotificationHistory,
|
||||||
NotificationStatistics,
|
NotificationStatistics,
|
||||||
Template)
|
Template,
|
||||||
|
NOTIFICATION_CREATED,
|
||||||
|
NOTIFICATION_SENDING,
|
||||||
|
NOTIFICATION_PENDING,
|
||||||
|
NOTIFICATION_TEMPORARY_FAILURE)
|
||||||
|
|
||||||
from app.dao.dao_utils import transactional
|
from app.dao.dao_utils import transactional
|
||||||
from app.statsd_decorators import statsd
|
from app.statsd_decorators import statsd
|
||||||
@@ -286,3 +290,18 @@ def dao_delete_notifications_and_history_by_id(notification_id):
|
|||||||
db.session.query(NotificationHistory).filter(
|
db.session.query(NotificationHistory).filter(
|
||||||
NotificationHistory.id == notification_id
|
NotificationHistory.id == notification_id
|
||||||
).delete(synchronize_session='fetch')
|
).delete(synchronize_session='fetch')
|
||||||
|
|
||||||
|
|
||||||
|
def dao_timeout_notifications(timeout_period_in_seconds):
|
||||||
|
# update all notifications that are older that the timeout_period_in_seconds
|
||||||
|
# with a status of created|sending|pending
|
||||||
|
updated = db.session.query(Notification). \
|
||||||
|
filter(Notification.created_at < (datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds))). \
|
||||||
|
filter(Notification.status.in_([NOTIFICATION_CREATED, NOTIFICATION_SENDING, NOTIFICATION_PENDING])). \
|
||||||
|
update({'status': NOTIFICATION_TEMPORARY_FAILURE}, synchronize_session=False)
|
||||||
|
db.session.query(NotificationHistory). \
|
||||||
|
filter(NotificationHistory.created_at < (datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds))). \
|
||||||
|
filter(NotificationHistory.status.in_([NOTIFICATION_CREATED, NOTIFICATION_SENDING, NOTIFICATION_PENDING])). \
|
||||||
|
update({'status': NOTIFICATION_TEMPORARY_FAILURE}, synchronize_session=False)
|
||||||
|
db.session.commit()
|
||||||
|
return updated
|
||||||
|
|||||||
@@ -34,8 +34,8 @@ from app.dao.notifications_dao import (
|
|||||||
get_notifications_for_service,
|
get_notifications_for_service,
|
||||||
update_notification_status_by_id,
|
update_notification_status_by_id,
|
||||||
update_notification_status_by_reference,
|
update_notification_status_by_reference,
|
||||||
dao_delete_notifications_and_history_by_id
|
dao_delete_notifications_and_history_by_id,
|
||||||
)
|
dao_timeout_notifications)
|
||||||
|
|
||||||
from notifications_utils.template import get_sms_fragment_count
|
from notifications_utils.template import get_sms_fragment_count
|
||||||
|
|
||||||
@@ -810,3 +810,45 @@ def _notification_json(sample_template, job_id=None, id=None, status=None):
|
|||||||
if status:
|
if status:
|
||||||
data.update({'status': status})
|
data.update({'status': status})
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def test_dao_timeout_notifications(notify_db, notify_db_session,):
|
||||||
|
with freeze_time(datetime.utcnow() - timedelta(minutes=1)):
|
||||||
|
created = sample_notification(notify_db, notify_db_session)
|
||||||
|
sending = sample_notification(notify_db, notify_db_session, status='sending')
|
||||||
|
pending = sample_notification(notify_db, notify_db_session, status='pending')
|
||||||
|
delivered = sample_notification(notify_db, notify_db_session, status='delivered')
|
||||||
|
|
||||||
|
assert Notification.query.get(created.id).status == 'created'
|
||||||
|
assert Notification.query.get(sending.id).status == 'sending'
|
||||||
|
assert Notification.query.get(pending.id).status == 'pending'
|
||||||
|
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||||
|
updated = dao_timeout_notifications(1)
|
||||||
|
assert Notification.query.get(created.id).status == 'temporary-failure'
|
||||||
|
assert Notification.query.get(sending.id).status == 'temporary-failure'
|
||||||
|
assert Notification.query.get(pending.id).status == 'temporary-failure'
|
||||||
|
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||||
|
assert NotificationHistory.query.get(created.id).status == 'temporary-failure'
|
||||||
|
assert NotificationHistory.query.get(sending.id).status == 'temporary-failure'
|
||||||
|
assert NotificationHistory.query.get(pending.id).status == 'temporary-failure'
|
||||||
|
assert NotificationHistory.query.get(delivered.id).status == 'delivered'
|
||||||
|
assert updated == 3
|
||||||
|
|
||||||
|
|
||||||
|
def test_dao_timeout_notifications_only_updates_for_older_notifications(notify_db, notify_db_session):
|
||||||
|
with freeze_time(datetime.utcnow() + timedelta(minutes=10)):
|
||||||
|
created = sample_notification(notify_db, notify_db_session)
|
||||||
|
sending = sample_notification(notify_db, notify_db_session, status='sending')
|
||||||
|
pending = sample_notification(notify_db, notify_db_session, status='pending')
|
||||||
|
delivered = sample_notification(notify_db, notify_db_session, status='delivered')
|
||||||
|
|
||||||
|
assert Notification.query.get(created.id).status == 'created'
|
||||||
|
assert Notification.query.get(sending.id).status == 'sending'
|
||||||
|
assert Notification.query.get(pending.id).status == 'pending'
|
||||||
|
assert Notification.query.get(delivered.id).status == 'delivered'
|
||||||
|
updated = dao_timeout_notifications(1)
|
||||||
|
assert NotificationHistory.query.get(created.id).status == 'created'
|
||||||
|
assert NotificationHistory.query.get(sending.id).status == 'sending'
|
||||||
|
assert NotificationHistory.query.get(pending.id).status == 'pending'
|
||||||
|
assert NotificationHistory.query.get(delivered.id).status == 'delivered'
|
||||||
|
assert updated == 0
|
||||||
|
|||||||
Reference in New Issue
Block a user