Files
notifications-admin/app/main/views/activity.py
2025-08-25 17:45:14 -04:00

216 lines
6.9 KiB
Python

import gevent
from flask import abort, render_template, request, url_for
from app import current_service, job_api_client
from app.enums import NotificationStatus, ServicePermission
from app.formatters import get_time_left
from app.main import main
from app.utils.pagination import (
generate_next_dict,
generate_pagination_pages,
generate_previous_dict,
get_page_from_request,
)
from app.utils.user import user_has_permissions
def get_report_info(service_id, report_name, s3_config):
try:
from app.s3_client import check_s3_file_exists, get_s3_object
from app.s3_client.s3_csv_client import NEW_FILE_LOCATION_STRUCTURE
key = NEW_FILE_LOCATION_STRUCTURE.format(service_id, report_name)
obj = get_s3_object(
s3_config["bucket"],
key,
s3_config["access_key_id"],
s3_config["secret_access_key"],
s3_config["region"],
)
exists = check_s3_file_exists(obj)
if exists:
# check_s3_file_exists already called obj.load(), so metadata should be populated
size_bytes = obj.content_length
# Only show as available if file has any content (not empty)
if size_bytes > 0:
if size_bytes < 1024:
size_str = f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
size_str = f"{size_bytes / 1024:.1f} KB"
else:
size_str = f"{size_bytes / (1024 * 1024):.1f} MB"
return {"available": True, "size": size_str}
except Exception: # nosec B110
pass
return {"available": False, "size": None}
def get_download_availability(service_id):
from flask import current_app
# Get S3 config before spawning greenlets
s3_config = {
"bucket": current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
"access_key_id": current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
"secret_access_key": current_app.config["CSV_UPLOAD_BUCKET"][
"secret_access_key"
],
"region": current_app.config["CSV_UPLOAD_BUCKET"]["region"],
}
report_names = ["1-day-report", "3-day-report", "5-day-report", "7-day-report"]
greenlets = [
gevent.spawn(get_report_info, service_id, name, s3_config)
for name in report_names
]
gevent.joinall(greenlets)
results = [g.value for g in greenlets]
return {
"report_1_day": (
results[0] if results[0] else {"available": False, "size": None}
),
"report_3_day": (
results[1] if results[1] else {"available": False, "size": None}
),
"report_5_day": (
results[2] if results[2] else {"available": False, "size": None}
),
"report_7_day": (
results[3] if results[3] else {"available": False, "size": None}
),
}
def get_download_links(message_type):
time_periods = ["one_day", "three_day", "five_day", "seven_day"]
links = {}
for period in time_periods:
links[f"download_link_{period}"] = url_for(
".download_notifications_csv",
service_id=current_service.id,
message_type=message_type,
status=request.args.get("status"),
number_of_days=period,
)
return links
def get_filtered_jobs(service_id, page):
filter_type = request.args.get("filter")
limit_days = None
if filter_type == "24hours":
limit_days = 1
elif filter_type == "3days":
limit_days = 3
elif filter_type == "7days":
limit_days = 7
if limit_days:
return job_api_client.get_page_of_jobs(
service_id, page=page, limit_days=limit_days, use_processing_time=True
)
else:
return job_api_client.get_page_of_jobs(service_id, page=page)
@main.route("/activity/services/<uuid:service_id>")
@user_has_permissions(ServicePermission.VIEW_ACTIVITY)
def all_jobs_activity(service_id):
service_data_retention_days = 8
page = get_page_from_request()
jobs = get_filtered_jobs(service_id, page)
all_jobs_dict = generate_job_dict(jobs)
prev_page, next_page, pagination = handle_pagination(jobs, service_id, page)
message_type = ("sms",)
download_availability = get_download_availability(service_id)
download_links = get_download_links(message_type)
return render_template(
"views/activity/all-activity.html",
all_jobs_dict=all_jobs_dict,
service_data_retention_days=service_data_retention_days,
next_page=next_page,
prev_page=prev_page,
pagination=pagination,
total_jobs=jobs.get("total", 0),
**download_availability,
**download_links,
)
def handle_pagination(jobs, service_id, page):
if page is None:
abort(404, "Invalid page argument ({}).".format(request.args.get("page")))
url_args = {}
if request.args.get("filter"):
url_args["filter"] = request.args.get("filter")
prev_page = (
generate_previous_dict("main.all_jobs_activity", service_id, page, url_args)
if page > 1
else None
)
total_items = jobs.get("total", 0)
page_size = jobs.get("page_size", 50)
total_pages = (total_items + page_size - 1) // page_size
has_next_link = jobs.get("links", {}).get("next") is not None
next_page = (
generate_next_dict("main.all_jobs_activity", service_id, page, url_args)
if has_next_link and total_items > 50 and page < total_pages
else None
)
pagination = generate_pagination_pages(
jobs.get("total", {}), jobs.get("page_size", {}), page
)
return prev_page, next_page, pagination
def get_job_statistics(job, status):
statistics = job.get("statistics", [])
for stat in statistics:
if stat.get("status") == status:
return stat.get("count")
return None
def create_job_dict_entry(job):
job_id = job.get("id")
can_download = get_time_left(job.get("created_at")) != "Data no longer available"
activity_time = job.get("processing_started") or job.get("created_at")
return {
"job_id": job_id,
"can_download": can_download,
"download_link": (
url_for(".view_job_csv", service_id=current_service.id, job_id=job_id)
if can_download
else None
),
"view_job_link": url_for(
".view_job", service_id=current_service.id, job_id=job_id
),
"activity_time": activity_time,
"created_by": job.get("created_by"),
"template_name": job.get("template_name"),
"delivered_count": get_job_statistics(job, NotificationStatus.DELIVERED),
"failed_count": get_job_statistics(job, NotificationStatus.FAILED),
}
def generate_job_dict(jobs):
if not jobs or not jobs.get("data"):
return []
return [create_job_dict_entry(job) for job in jobs["data"]]