This commit is contained in:
Kenneth Kehl
2024-11-19 09:04:33 -08:00
parent bfcc8ac708
commit 67aa1e66a6
2 changed files with 15 additions and 22 deletions

View File

@@ -108,42 +108,28 @@ def check_job_status():
Job.job_status == JobStatus.IN_PROGRESS,
between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago),
)
print(f"QUERY 1 {incomplete_in_progress_jobs}")
incomplete_pending_jobs = select(Job).filter(
Job.job_status == JobStatus.PENDING,
Job.scheduled_for.isnot(None),
between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago),
)
print(f"QUERY 2 {incomplete_pending_jobs}")
jobs_not_complete_after_30_minutes = (
db.session.execute(
union(incomplete_in_progress_jobs, incomplete_pending_jobs).order_by(
Job.processing_started, Job.scheduled_for
)
jobs_not_complete_after_30_minutes = db.session.execute(
union(incomplete_in_progress_jobs, incomplete_pending_jobs).order_by(
Job.processing_started, Job.scheduled_for
)
.all()
)
print(f"HERE IS JOBS {jobs_not_complete_after_30_minutes}")
).all()
# temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks
# if they haven't been re-processed in time.
job_ids = []
for job in jobs_not_complete_after_30_minutes:
print(f"HERE IS A JOB {job}")
# job.job_status = JobStatus.ERROR
# print("CHANGED JOB STATUS TO ERROR")
# dao_update_job(job)
db.session.execute(
update(Job).where(Job.id == job.id).values(job_status=JobStatus.ERROR)
)
db.session.commit()
job_ids.append(str(job.id))
print(f"APPENDED NEW JOB ID TO LIST WHICH IS {job_ids}")
if job_ids:
current_app.logger.info("Job(s) {} have not completed.".format(job_ids))
process_incomplete_jobs.apply_async([job_ids], queue=QueueNames.JOBS)