start on jobs dao

This commit is contained in:
Kenneth Kehl
2024-10-15 13:26:13 -07:00
parent 9c95e588d1
commit f83032c4bc

View File

@@ -3,7 +3,7 @@ import uuid
from datetime import timedelta
from flask import current_app
from sqlalchemy import and_, asc, desc, func
from sqlalchemy import and_, asc, desc, func, select
from app import db
from app.enums import JobStatus
@@ -18,36 +18,33 @@ from app.utils import midnight_n_days_ago, utc_now
def dao_get_notification_outcomes_for_job(service_id, job_id):
notification_statuses = (
db.session.query(
func.count(Notification.status).label("count"), Notification.status
)
stmt = (
select(func.count(Notification.status).label("count"), Notification.status)
.filter(Notification.service_id == service_id, Notification.job_id == job_id)
.group_by(Notification.status)
.all()
)
notification_statuses = db.session.execute(stmt).all()
if not notification_statuses:
notification_statuses = (
db.session.query(
FactNotificationStatus.notification_count.label("count"),
FactNotificationStatus.notification_status.label("status"),
)
.filter(
FactNotificationStatus.service_id == service_id,
FactNotificationStatus.job_id == job_id,
)
.all()
stmt = select(
FactNotificationStatus.notification_count.label("count"),
FactNotificationStatus.notification_status.label("status"),
).filter(
FactNotificationStatus.service_id == service_id,
FactNotificationStatus.job_id == job_id,
)
notification_statuses = db.session.execute(stmt).all()
return notification_statuses
def dao_get_job_by_service_id_and_job_id(service_id, job_id):
return Job.query.filter_by(service_id=service_id, id=job_id).one()
stmt = select(Job).filter_by(service_id=service_id, id=job_id)
return db.session.execute(stmt).scalars().one()
def dao_get_unfinished_jobs():
return Job.query.filter(Job.processing_finished.is_(None)).all()
stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()
def dao_get_jobs_by_service_id(
@@ -67,8 +64,9 @@ def dao_get_jobs_by_service_id(
query_filter.append(Job.created_at >= midnight_n_days_ago(limit_days))
if statuses is not None and statuses != [""]:
query_filter.append(Job.job_status.in_(statuses))
return (
Job.query.filter(*query_filter)
select(*query_filter)
.order_by(Job.processing_started.desc(), Job.created_at.desc())
.paginate(page=page, per_page=page_size)
)
@@ -77,21 +75,19 @@ def dao_get_jobs_by_service_id(
def dao_get_scheduled_job_stats(
service_id,
):
return (
db.session.query(
func.count(Job.id),
func.min(Job.scheduled_for),
)
.filter(
Job.service_id == service_id,
Job.job_status == JobStatus.SCHEDULED,
)
.one()
stmt = select(
func.count(Job.id),
func.min(Job.scheduled_for),
).filter(
Job.service_id == service_id,
Job.job_status == JobStatus.SCHEDULED,
)
return db.session.execute(stmt).all()
def dao_get_job_by_id(job_id):
return Job.query.filter_by(id=job_id).one()
stmt = select(Job).filter_by(id=job_id)
return db.session.execute(stmt).scalars().one()
def dao_archive_job(job):