diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 3597bdbb7..2d1250c37 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,10 +1,10 @@ from datetime import timedelta from flask import current_app -from sqlalchemy import between +from sqlalchemy import between, select, union from sqlalchemy.exc import SQLAlchemyError -from app import notify_celery, zendesk_client +from app import db, notify_celery, zendesk_client from app.celery.tasks import ( get_recipient_csv_and_template_and_sender_id, process_incomplete_jobs, @@ -105,19 +105,23 @@ def check_job_status(): thirty_minutes_ago = utc_now() - timedelta(minutes=30) thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) - incomplete_in_progress_jobs = Job.query.filter( + incomplete_in_progress_jobs = select(Job).filter( Job.job_status == JobStatus.IN_PROGRESS, between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), ) - incomplete_pending_jobs = Job.query.filter( + incomplete_pending_jobs = select(Job).filter( Job.job_status == JobStatus.PENDING, Job.scheduled_for.isnot(None), between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), ) jobs_not_complete_after_30_minutes = ( - incomplete_in_progress_jobs.union(incomplete_pending_jobs) - .order_by(Job.processing_started, Job.scheduled_for) + db.session.execute( + union(incomplete_in_progress_jobs, incomplete_pending_jobs).order_by( + Job.processing_started, Job.scheduled_for + ) + ) + .scalars() .all() )