change time range for incomplete jobs

This commit is contained in:
Kenneth Kehl
2024-12-27 10:13:28 -08:00
parent 45a1492c99
commit 85a55dbaed

View File

@@ -100,27 +100,29 @@ def check_job_status():
select select
from jobs from jobs
where job_status == 'in progress' where job_status == 'in progress'
and processing started between 30 and 35 minutes ago and processing started some time ago
OR where the job_status == 'pending' OR where the job_status == 'pending'
and the job scheduled_for timestamp is between 30 and 35 minutes ago. and the job scheduled_for timestamp is some time ago.
if any results then if any results then
update the job_status to 'error' update the job_status to 'error'
process the rows in the csv that are missing (in another task) just do the check here. process the rows in the csv that are missing (in another task) just do the check here.
""" """
thirty_minutes_ago = utc_now() - timedelta(minutes=30) START_MINUTES = 245
thirty_five_minutes_ago = utc_now() - timedelta(minutes=35) END_MINUTES = 240
end_minutes_ago = utc_now() - timedelta(minutes=END_MINUTES)
start_minutes_ago = utc_now() - timedelta(minutes=START_MINUTES)
incomplete_in_progress_jobs = Job.query.filter( incomplete_in_progress_jobs = Job.query.filter(
Job.job_status == JobStatus.IN_PROGRESS, Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), between(Job.processing_started, start_minutes_ago, end_minutes_ago),
) )
incomplete_pending_jobs = Job.query.filter( incomplete_pending_jobs = Job.query.filter(
Job.job_status == JobStatus.PENDING, Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None), Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), between(Job.scheduled_for, start_minutes_ago, end_minutes_ago),
) )
jobs_not_complete_after_30_minutes = ( jobs_not_complete_after_allotted_time = (
incomplete_in_progress_jobs.union(incomplete_pending_jobs) incomplete_in_progress_jobs.union(incomplete_pending_jobs)
.order_by(Job.processing_started, Job.scheduled_for) .order_by(Job.processing_started, Job.scheduled_for)
.all() .all()
@@ -129,7 +131,7 @@ def check_job_status():
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time. # if they haven't been re-processed in time.
job_ids = [] job_ids = []
for job in jobs_not_complete_after_30_minutes: for job in jobs_not_complete_after_allotted_time:
job.job_status = JobStatus.ERROR job.job_status = JobStatus.ERROR
dao_update_job(job) dao_update_job(job)
job_ids.append(str(job.id)) job_ids.append(str(job.id))