diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 3597bdbb7..22155a40e 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -1,10 +1,10 @@ from datetime import timedelta from flask import current_app -from sqlalchemy import between +from sqlalchemy import between, select from sqlalchemy.exc import SQLAlchemyError -from app import notify_celery, zendesk_client +from app import db, notify_celery, zendesk_client from app.celery.tasks import ( get_recipient_csv_and_template_and_sender_id, process_incomplete_jobs, @@ -105,20 +105,21 @@ def check_job_status(): thirty_minutes_ago = utc_now() - timedelta(minutes=30) thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) - incomplete_in_progress_jobs = Job.query.filter( + incomplete_in_progress_jobs = select(Job).where( Job.job_status == JobStatus.IN_PROGRESS, between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), ) - incomplete_pending_jobs = Job.query.filter( + incomplete_pending_jobs = select(Job).where( Job.job_status == JobStatus.PENDING, Job.scheduled_for.isnot(None), between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), ) + jobs_not_complete_after_30_minutes = incomplete_in_progress_jobs.union( + incomplete_pending_jobs + ).order_by(Job.processing_started, Job.scheduled_for) jobs_not_complete_after_30_minutes = ( - incomplete_in_progress_jobs.union(incomplete_pending_jobs) - .order_by(Job.processing_started, Job.scheduled_for) - .all() + db.session.execute(jobs_not_complete_after_30_minutes).scalars().all() ) # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks diff --git a/app/dao/inbound_sms_dao.py b/app/dao/inbound_sms_dao.py index 433b4b4c9..1687bd56f 100644 --- a/app/dao/inbound_sms_dao.py +++ b/app/dao/inbound_sms_dao.py @@ -84,7 +84,7 @@ def dao_count_inbound_sms_for_service(service_id, limit_days): def _insert_inbound_sms_history(subquery, query_limit=10000): offset = 0 subquery_select = select(subquery) - inbound_sms_query = select( + inbound_sms_querie = select( InboundSms.id, InboundSms.created_at, InboundSms.service_id, @@ -94,13 +94,13 @@ def _insert_inbound_sms_history(subquery, query_limit=10000): InboundSms.provider, ).where(InboundSms.id.in_(subquery_select)) - count_query = select(func.count()).select_from(inbound_sms_query.subquery()) + count_query = select(func.count()).select_from(inbound_sms_querie.subquery()) inbound_sms_count = db.session.execute(count_query).scalar() or 0 while offset < inbound_sms_count: statement = insert(InboundSmsHistory).from_select( InboundSmsHistory.__table__.c, - inbound_sms_query.limit(query_limit).offset(offset), + inbound_sms_querie.limit(query_limit).offset(offset), ) statement = statement.on_conflict_do_nothing(