diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index c960c3b20..c58ecc557 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -7,8 +7,8 @@ from app.aws import s3 from app import notify_celery from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than -from app.dao.notifications_dao import delete_notifications_created_more_than_a_week_ago, get_notifications, \ - update_notification_status_by_id +from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago, + dao_timeout_notifications) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.statsd_decorators import statsd from app.models import JOB_STATUS_PENDING @@ -111,21 +111,13 @@ def delete_invitations(): @notify_celery.task(name='timeout-sending-notifications') @statsd(namespace="tasks") def timeout_notifications(): - # TODO: optimize the query by adding the date where clause to this query. - notifications = get_notifications(filter_dict={'status': 'sending'}) - now = datetime.utcnow() - for noti in notifications: - try: - if (now - noti.created_at) > timedelta( - seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') - ): - # TODO: think about making this a bulk update rather than one at a time. - updated = update_notification_status_by_id(noti.id, 'temporary-failure') - if updated: - current_app.logger.info( - "Timeout period reached for notification ({}), status has been updated.".format(noti.id)) - except Exception as e: - current_app.logger.exception(e) - current_app.logger.error( - "Exception raised trying to timeout notification ({}) skipping notification update.".format(noti.id) - ) + try: + updated = dao_timeout_notifications(current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD')) + if updated: + current_app.logger.info( + "Timeout period reached for {} notifications, status has been updated.".format(updated)) + except Exception as e: + current_app.logger.exception(e) + current_app.logger.error( + "Exception raised trying to timeout notification skipping notification update." + ) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index aaab8f5e2..80cba3eb9 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -17,7 +17,11 @@ from app.models import ( Notification, NotificationHistory, NotificationStatistics, - Template) + Template, + NOTIFICATION_CREATED, + NOTIFICATION_SENDING, + NOTIFICATION_PENDING, + NOTIFICATION_TEMPORARY_FAILURE) from app.dao.dao_utils import transactional from app.statsd_decorators import statsd @@ -286,3 +290,18 @@ def dao_delete_notifications_and_history_by_id(notification_id): db.session.query(NotificationHistory).filter( NotificationHistory.id == notification_id ).delete(synchronize_session='fetch') + + +def dao_timeout_notifications(timeout_period_in_seconds): + # update all notifications that are older that the timeout_period_in_seconds + # with a status of created|sending|pending + updated = db.session.query(Notification). \ + filter(Notification.created_at < (datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds))). \ + filter(Notification.status.in_([NOTIFICATION_CREATED, NOTIFICATION_SENDING, NOTIFICATION_PENDING])). \ + update({'status': NOTIFICATION_TEMPORARY_FAILURE}, synchronize_session=False) + db.session.query(NotificationHistory). \ + filter(NotificationHistory.created_at < (datetime.utcnow() - timedelta(seconds=timeout_period_in_seconds))). \ + filter(NotificationHistory.status.in_([NOTIFICATION_CREATED, NOTIFICATION_SENDING, NOTIFICATION_PENDING])). \ + update({'status': NOTIFICATION_TEMPORARY_FAILURE}, synchronize_session=False) + db.session.commit() + return updated diff --git a/tests/app/dao/test_notification_dao.py b/tests/app/dao/test_notification_dao.py index 3ef686713..55029160f 100644 --- a/tests/app/dao/test_notification_dao.py +++ b/tests/app/dao/test_notification_dao.py @@ -34,8 +34,8 @@ from app.dao.notifications_dao import ( get_notifications_for_service, update_notification_status_by_id, update_notification_status_by_reference, - dao_delete_notifications_and_history_by_id -) + dao_delete_notifications_and_history_by_id, + dao_timeout_notifications) from notifications_utils.template import get_sms_fragment_count @@ -810,3 +810,45 @@ def _notification_json(sample_template, job_id=None, id=None, status=None): if status: data.update({'status': status}) return data + + +def test_dao_timeout_notifications(notify_db, notify_db_session,): + with freeze_time(datetime.utcnow() - timedelta(minutes=1)): + created = sample_notification(notify_db, notify_db_session) + sending = sample_notification(notify_db, notify_db_session, status='sending') + pending = sample_notification(notify_db, notify_db_session, status='pending') + delivered = sample_notification(notify_db, notify_db_session, status='delivered') + + assert Notification.query.get(created.id).status == 'created' + assert Notification.query.get(sending.id).status == 'sending' + assert Notification.query.get(pending.id).status == 'pending' + assert Notification.query.get(delivered.id).status == 'delivered' + updated = dao_timeout_notifications(1) + assert Notification.query.get(created.id).status == 'temporary-failure' + assert Notification.query.get(sending.id).status == 'temporary-failure' + assert Notification.query.get(pending.id).status == 'temporary-failure' + assert Notification.query.get(delivered.id).status == 'delivered' + assert NotificationHistory.query.get(created.id).status == 'temporary-failure' + assert NotificationHistory.query.get(sending.id).status == 'temporary-failure' + assert NotificationHistory.query.get(pending.id).status == 'temporary-failure' + assert NotificationHistory.query.get(delivered.id).status == 'delivered' + assert updated == 3 + + +def test_dao_timeout_notifications_only_updates_for_older_notifications(notify_db, notify_db_session): + with freeze_time(datetime.utcnow() + timedelta(minutes=10)): + created = sample_notification(notify_db, notify_db_session) + sending = sample_notification(notify_db, notify_db_session, status='sending') + pending = sample_notification(notify_db, notify_db_session, status='pending') + delivered = sample_notification(notify_db, notify_db_session, status='delivered') + + assert Notification.query.get(created.id).status == 'created' + assert Notification.query.get(sending.id).status == 'sending' + assert Notification.query.get(pending.id).status == 'pending' + assert Notification.query.get(delivered.id).status == 'delivered' + updated = dao_timeout_notifications(1) + assert NotificationHistory.query.get(created.id).status == 'created' + assert NotificationHistory.query.get(sending.id).status == 'sending' + assert NotificationHistory.query.get(pending.id).status == 'pending' + assert NotificationHistory.query.get(delivered.id).status == 'delivered' + assert updated == 0