From 67aa1e66a691edd473bbc8168459475dc6b6ebf9 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 19 Nov 2024 09:04:33 -0800 Subject: [PATCH] fix --- app/celery/scheduled_tasks.py | 22 ++++------------------ app/dao/notifications_dao.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 22 deletions(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 745ebd785..f51b2d994 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -108,42 +108,28 @@ def check_job_status(): Job.job_status == JobStatus.IN_PROGRESS, between(Job.processing_started, thirty_five_minutes_ago, thirty_minutes_ago), ) - print(f"QUERY 1 {incomplete_in_progress_jobs}") incomplete_pending_jobs = select(Job).filter( Job.job_status == JobStatus.PENDING, Job.scheduled_for.isnot(None), between(Job.scheduled_for, thirty_five_minutes_ago, thirty_minutes_ago), ) - print(f"QUERY 2 {incomplete_pending_jobs}") - - jobs_not_complete_after_30_minutes = ( - db.session.execute( - union(incomplete_in_progress_jobs, incomplete_pending_jobs).order_by( - Job.processing_started, Job.scheduled_for - ) + jobs_not_complete_after_30_minutes = db.session.execute( + union(incomplete_in_progress_jobs, incomplete_pending_jobs).order_by( + Job.processing_started, Job.scheduled_for ) - .all() - ) - print(f"HERE IS JOBS {jobs_not_complete_after_30_minutes}") + ).all() # temporarily mark them as ERROR so that they don't get picked up by future check_job_status tasks # if they haven't been re-processed in time. job_ids = [] for job in jobs_not_complete_after_30_minutes: - print(f"HERE IS A JOB {job}") - # job.job_status = JobStatus.ERROR - # print("CHANGED JOB STATUS TO ERROR") - # dao_update_job(job) - db.session.execute( update(Job).where(Job.id == job.id).values(job_status=JobStatus.ERROR) ) db.session.commit() job_ids.append(str(job.id)) - print(f"APPENDED NEW JOB ID TO LIST WHICH IS {job_ids}") - if job_ids: current_app.logger.info("Job(s) {} have not completed.".format(job_ids)) process_incomplete_jobs.apply_async([job_ids], queue=QueueNames.JOBS) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index cbde45d30..f60775da9 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -10,6 +10,7 @@ from werkzeug.datastructures import MultiDict from app import create_uuid, db from app.dao.dao_utils import autocommit +from app.dao.inbound_sms_dao import Pagination from app.enums import KeyType, NotificationStatus, NotificationType from app.models import FactNotificationStatus, Notification, NotificationHistory from app.utils import ( @@ -193,11 +194,17 @@ def get_notifications_for_job( if page_size is None: page_size = current_app.config["PAGE_SIZE"] - query = Notification.query.filter_by(service_id=service_id, job_id=job_id) + query = select(Notification).filter_by(service_id=service_id, job_id=job_id) query = _filter_query(query, filter_dict) - return query.order_by(asc(Notification.job_row_number)).paginate( - page=page, per_page=page_size - ) + query = query.order_by(asc(Notification.job_row_number)) + + results = db.session.execute(query).scalars().all() + + page_size = current_app.config["PAGE_SIZE"] + offset = (page - 1) * page_size + paginated_results = results[offset : offset + page_size] + pagination = Pagination(paginated_results, page, page_size, len(results)) + return pagination def dao_get_notification_count_for_job_id(*, job_id):