This commit is contained in:
Kenneth Kehl
2024-11-18 14:54:49 -08:00
parent ce5b9cf055
commit 38e7672866
2 changed files with 11 additions and 10 deletions

View File

@@ -1,10 +1,10 @@
from datetime import timedelta
from flask import current_app
from sqlalchemy import between
from sqlalchemy import between, select
from sqlalchemy.exc import SQLAlchemyError
from app import notify_celery, zendesk_client
from app import db, notify_celery, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
@@ -105,20 +105,21 @@ def check_job_status():
thirty_minutes_ago = utc_now() - timedelta(minutes=30)
thirty_five_minutes_ago = utc_now() - timedelta(minutes=35)
incomplete_in_progress_jobs = Job.query.filter(
incomplete_in_progress_jobs = select(Job).where(
Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
)
incomplete_pending_jobs = Job.query.filter(
incomplete_pending_jobs = select(Job).where(
Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
)
jobs_not_complete_after_30_minutes = incomplete_in_progress_jobs.union(
incomplete_pending_jobs
).order_by(Job.processing_started, Job.scheduled_for)
jobs_not_complete_after_30_minutes = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for)
.all()
db.session.execute(jobs_not_complete_after_30_minutes).scalars().all()
)
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks

View File

@@ -84,7 +84,7 @@ def dao_count_inbound_sms_for_service(service_id, limit_days):
def _insert_inbound_sms_history(subquery, query_limit=10000):
offset = 0
subquery_select = select(subquery)
inbound_sms_query = select(
inbound_sms_querie = select(
InboundSms.id,
InboundSms.created_at,
InboundSms.service_id,
@@ -94,13 +94,13 @@ def _insert_inbound_sms_history(subquery, query_limit=10000):
InboundSms.provider,
).where(InboundSms.id.in_(subquery_select))
count_query = select(func.count()).select_from(inbound_sms_query.subquery())
count_query = select(func.count()).select_from(inbound_sms_querie.subquery())
inbound_sms_count = db.session.execute(count_query).scalar() or 0
while offset < inbound_sms_count:
statement = insert(InboundSmsHistory).from_select(
InboundSmsHistory.__table__.c,
inbound_sms_query.limit(query_limit).offset(offset),
inbound_sms_querie.limit(query_limit).offset(offset),
)
statement = statement.on_conflict_do_nothing(