diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index 563eba68f..f0c777081 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -81,12 +81,6 @@ def dao_get_jobs_by_service_id( items = db.session.execute(stmt).scalars().all() return Pagination(items, page, page_size, total_items) - # return ( - # Job.query.filter(*query_filter) - # .order_by(Job.processing_started.desc(), Job.created_at.desc()) - # .paginate(page=page, per_page=page_size) - # ) - def dao_get_scheduled_job_stats( service_id, @@ -121,15 +115,15 @@ def dao_set_scheduled_jobs_to_pending(): the transaction so that if the task is run more than once concurrently, one task will block the other select from completing until it commits. """ - jobs = ( - Job.query.filter( + stmt = ( + select( Job.job_status == JobStatus.SCHEDULED, Job.scheduled_for < utc_now(), ) .order_by(asc(Job.scheduled_for)) .with_for_update() - .all() ) + jobs = db.session.execute(stmt).all() for job in jobs: job.job_status = JobStatus.PENDING @@ -141,12 +135,13 @@ def dao_set_scheduled_jobs_to_pending(): def dao_get_future_scheduled_job_by_id_and_service_id(job_id, service_id): - return Job.query.filter( + stmt = select(Job).filter( Job.service_id == service_id, Job.id == job_id, Job.job_status == JobStatus.SCHEDULED, Job.scheduled_for > utc_now(), - ).one() + ) + return db.session.execute(stmt).scalars().one() def dao_create_job(job):