From 1bfd25107b413bbc58be222c8697087c23bdf92e Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Fri, 8 Jul 2016 14:48:07 +0100 Subject: [PATCH] Add a lockmode for the update_notifications_status_by_id to prevent the timeout task from updating the same notification more than once. This happens because more than one beat process was creating the timeout task, resulting in multiple workers running the same queries at the same time. --- app/celery/scheduled_tasks.py | 10 ++++++---- app/dao/notifications_dao.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 43528d74f..a1145a0f3 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -76,6 +76,7 @@ def delete_invitations(): @notify_celery.task(name='timeout-sending-notifications') def timeout_notifications(): + # TODO: optimize the query by adding the date where clause to this query. notifications = get_notifications(filter_dict={'status': 'sending'}) now = datetime.utcnow() for noti in notifications: @@ -83,10 +84,11 @@ def timeout_notifications(): if (now - noti.created_at) > timedelta( seconds=current_app.config.get('SENDING_NOTIFICATIONS_TIMEOUT_PERIOD') ): - update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE) - current_app.logger.info(( - "Timeout period reached for notification ({})" - ", status has been updated.").format(noti.id)) + # TODO: think about making this a bulk update rather than one at a time. + updated = update_notification_status_by_id(noti.id, 'temporary-failure', STATISTICS_FAILURE) + if updated: + current_app.logger.info(("Timeout period reached for notification ({})" + ", status has been updated.").format(noti.id)) except Exception as e: current_app.logger.exception(e) current_app.logger.error(( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 29044b9bd..4160c6261 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -246,7 +246,7 @@ def _update_notification_status(notification, status, notification_statistics_st @transactional def update_notification_status_by_id(notification_id, status, notification_statistics_status=None): - notification = Notification.query.filter( + notification = Notification.query.with_lockmode("update").filter( Notification.id == notification_id, or_(Notification.status == 'created', Notification.status == 'sending',