Merge pull request #1569 from GSA/feat/fe-tz-change

Updated endpoint to allow hourly updates to enable utc => local time conversion
This commit is contained in:
Alex Janousek
2025-02-27 10:04:15 -05:00
committed by GitHub
6 changed files with 267 additions and 7 deletions

View File

@@ -93,3 +93,25 @@ def generate_date_range(start_date, end_date=None, days=0):
current_date += timedelta(days=1)
else:
return "An end_date or number of days must be specified"
def generate_hourly_range(start_date, end_date=None, hours=0):
if end_date:
current_time = start_date
while current_time <= end_date:
try:
yield current_time
except ValueError:
pass
current_time += timedelta(hours=1)
elif hours > 0:
end_time = start_date + timedelta(hours=hours)
current_time = start_date
while current_time < end_time:
try:
yield current_time
except ValueError:
pass
current_time += timedelta(hours=1)
else:
return "An end_date or number of hours must be specified"

View File

@@ -8,7 +8,11 @@ from sqlalchemy.sql.expression import and_, asc, case, func
from app import db
from app.dao.dao_utils import VersionOptions, autocommit, version_class
from app.dao.date_util import generate_date_range, get_current_calendar_year
from app.dao.date_util import (
generate_date_range,
generate_hourly_range,
get_current_calendar_year,
)
from app.dao.organization_dao import dao_get_organization_by_email_address
from app.dao.service_sms_sender_dao import insert_service_sms_sender
from app.dao.service_user_dao import dao_get_service_user
@@ -523,6 +527,68 @@ def dao_fetch_stats_for_service_from_days(service_id, start_date, end_date):
return total_notifications, data
def dao_fetch_stats_for_service_from_hours(service_id, start_date, end_date):
start_date = get_midnight_in_utc(start_date)
end_date = get_midnight_in_utc(end_date + timedelta(days=1))
# Update to group by HOUR instead of DAY
total_substmt = (
select(
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED
Job.notification_count.label("notification_count"),
)
.join(Job, NotificationAllTimeView.job_id == Job.id)
.where(
NotificationAllTimeView.service_id == service_id,
NotificationAllTimeView.key_type != KeyType.TEST,
NotificationAllTimeView.created_at >= start_date,
NotificationAllTimeView.created_at < end_date,
)
.group_by(
Job.id,
Job.notification_count,
func.date_trunc("hour", NotificationAllTimeView.created_at), # UPDATED
)
.subquery()
)
# Also update this to group by hour
total_stmt = select(
total_substmt.c.hour, # UPDATED
func.sum(total_substmt.c.notification_count).label("total_notifications"),
).group_by(total_substmt.c.hour) # UPDATED
# Ensure we're using hourly timestamps in the response
total_notifications = {
row.hour: row.total_notifications for row in db.session.execute(total_stmt).all()
}
# Update the second query to also use "hour"
stmt = (
select(
NotificationAllTimeView.notification_type,
NotificationAllTimeView.status,
func.date_trunc("hour", NotificationAllTimeView.created_at).label("hour"), # UPDATED
func.count(NotificationAllTimeView.id).label("count"),
)
.where(
NotificationAllTimeView.service_id == service_id,
NotificationAllTimeView.key_type != KeyType.TEST,
NotificationAllTimeView.created_at >= start_date,
NotificationAllTimeView.created_at < end_date,
)
.group_by(
NotificationAllTimeView.notification_type,
NotificationAllTimeView.status,
func.date_trunc("hour", NotificationAllTimeView.created_at), # UPDATED
)
)
data = db.session.execute(stmt).all()
return total_notifications, data
def dao_fetch_stats_for_service_from_days_for_user(
service_id, start_date, end_date, user_id
):
@@ -827,3 +893,36 @@ def get_specific_days_stats(
for day, rows in grouped_data.items()
}
return stats
def get_specific_hours_stats(data, start_date, hours=None, end_date=None, total_notifications=None):
if hours is not None and end_date is not None:
raise ValueError("Only set hours OR set end_date, not both.")
elif hours is not None:
gen_range = [start_date + timedelta(hours=i) for i in range(hours)]
elif end_date is not None:
gen_range = generate_hourly_range(start_date, end_date=end_date)
else:
raise ValueError("Either hours or end_date must be set.")
# Ensure all hours exist in the output (even if empty)
grouped_data = {hour: [] for hour in gen_range}
# Group notifications based on full hour timestamps
for row in data:
notification_type, status, timestamp, count = row
row_hour = timestamp.replace(minute=0, second=0, microsecond=0)
if row_hour in grouped_data:
grouped_data[row_hour].append(row)
# Format statistics, returning only hours with results
stats = {
hour.strftime("%Y-%m-%dT%H:00:00Z"): statistics.format_statistics(
rows,
total_notifications.get(hour, 0) if total_notifications else None
)
for hour, rows in grouped_data.items() if rows
}
return stats