diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 097448ed8..dea2196fc 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import datetime from flask import current_app from sqlalchemy.exc import SQLAlchemyError @@ -6,12 +6,11 @@ from sqlalchemy.exc import SQLAlchemyError from app.aws import s3 from app import notify_celery from app.dao.invited_user_dao import delete_invitations_created_more_than_two_days_ago -from app.dao.jobs_dao import dao_get_scheduled_jobs, dao_update_job, dao_get_jobs_older_than +from app.dao.jobs_dao import dao_set_scheduled_jobs_to_pending, dao_get_jobs_older_than from app.dao.notifications_dao import (delete_notifications_created_more_than_a_week_ago, dao_timeout_notifications) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.statsd_decorators import statsd -from app.models import JOB_STATUS_PENDING from app.celery.tasks import process_job @@ -27,24 +26,14 @@ def remove_csv_files(): @notify_celery.task(name="run-scheduled-jobs") @statsd(namespace="tasks") def run_scheduled_jobs(): - from app import db try: - jobs = dao_get_scheduled_jobs() - for job in jobs: - job.job_status = JOB_STATUS_PENDING - from time import sleep - sleep(1) - print('SCHEDULING' + str(job.id)) - # dao_update_job(job) - # process_job.apply_async([str(job.id)], queue="process-job") + for job in dao_set_scheduled_jobs_to_pending(): + process_job.apply_async([str(job.id)], queue="process-job") current_app.logger.info( - "Job ID {} added to process job queue".format(job.id) + "Job ID {} added to process job queue".format(job.id) ) - db.session.add_all(jobs) - db.session.commit() except SQLAlchemyError as e: current_app.logger.exception("Failed to run scheduled jobs", e) - db.session.rollback() raise diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index b9b677708..9d674b222 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -4,7 +4,7 @@ from sqlalchemy import func, desc, asc, cast, Date as sql_date from app import db from app.dao import days_ago -from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED +from app.models import Job, NotificationHistory, JOB_STATUS_SCHEDULED, JOB_STATUS_PENDING from app.statsd_decorators import statsd @@ -45,8 +45,15 @@ def dao_get_job_by_id(job_id): return Job.query.filter_by(id=job_id).one() -def dao_get_scheduled_jobs(): - return Job.query \ +def dao_set_scheduled_jobs_to_pending(): + """ + Sets all past scheduled jobs to pending, and then returns them for further processing. + + this is used in the run_scheduled_jobs task, so we put a FOR UPDATE lock on the job table for the duration of + the transaction so that if the task is run more than once concurrently, one task will block the other select + from completing until it commits. + """ + jobs = Job.query \ .filter( Job.job_status == JOB_STATUS_SCHEDULED, Job.scheduled_for < datetime.utcnow() @@ -55,6 +62,15 @@ def dao_get_scheduled_jobs(): .with_for_update() \ .all() + for job in jobs: + job.job_status = JOB_STATUS_PENDING + + db.session.add_all(jobs) + db.session.commit() + + return jobs + + def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id): return Job.query \