From 897ad6a95739e9ce41fef512f6b5dd46c03dcccd Mon Sep 17 00:00:00 2001 From: Martyn Inglis Date: Fri, 7 Oct 2016 10:47:48 +0100 Subject: [PATCH] prevent race conditions in run_scheduled_jobs queuing jobs multiple times we were running into issues where multiple beats queue up the run_scheduled_jobs task at the same time, and concurrency issues with selecting scheduled jobs causes both tasks to trigger processing of the job. Use with_for_update, which calls through to the postgres SELECT ... FOR UPDATE which locks other SELECT FOR UPDATES (ie other threads running same code) until the rows are set to pending and the transaction completes - so the second thread will not find any rows --- app/celery/scheduled_tasks.py | 11 +++++++++-- app/dao/jobs_dao.py | 1 + lambda_function/README.md | 9 +++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) create mode 100644 lambda_function/README.md diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 4448079f0..097448ed8 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -27,17 +27,24 @@ def remove_csv_files(): @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): + from app import db try: jobs = dao_get_scheduled_jobs() for job in jobs: job.job_status = JOB_STATUS_PENDING - dao_update_job(job) - process_job.apply_async([str(job.id)], queue="process-job") + from time import sleep + sleep(1) + print('SCHEDULING' + str(job.id)) + # dao_update_job(job) + # process_job.apply_async([str(job.id)], queue="process-job") current_app.logger.info( "Job ID {} added to process job queue".format(job.id) ) + db.session.add_all(jobs) + db.session.commit() except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) + db.session.rollback() raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 41ba00940..b9b677708 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -52,6 +52,7 @@ def dao_get_scheduled_jobs(): Job.scheduled_for < datetime.utcnow() ) \ .order_by(asc(Job.scheduled_for)) \ + .with_for_update() \ .all() diff --git a/lambda_function/README.md b/lambda_function/README.md new file mode 100644 index 000000000..3a8df5a1a --- /dev/null +++ b/lambda_function/README.md @@ -0,0 +1,9 @@ +# Lambda function + +This code is used to setup a lambda function to call into our API to deliver messages. + +Basic expected flow is that an request to GOV.UK Notify to send a text/email will persist in our DB then invoke an async lambda function. + +This function will call /deliver/notification/{id} which will make the API call to our delivery partners, updating the DB with the response. + +## EXPERIMENTAL AT THIS STAGE