This commit is contained in:
Kenneth Kehl
2025-08-05 08:11:22 -07:00
parent 7885f51b77
commit 270189c00e
2 changed files with 166 additions and 34 deletions

View File

@@ -480,8 +480,7 @@ def make_task(app):
)
)
def on_failure(self, exc, task_id, args, kwargs, einfo):
current_app.logger.debug(f"Using {einfo}")
def on_failure(self, exc, task_id, args, kwargs, einfo=None):
# enables request id tracing for these logs
with self.app_context():

View File

@@ -1,4 +1,8 @@
import csv
import io
import itertools
import logging
import os
import time
import uuid
from datetime import datetime, timedelta
@@ -123,11 +127,26 @@ from app.utils import (
hilite,
utc_now,
)
from notifications_utils.s3 import s3upload
celery_logger = logging.getLogger(__name__)
service_blueprint = Blueprint("service", __name__)
register_errors(service_blueprint)
NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv"
def get_csv_location(service_id, upload_id):
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id),
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)
@service_blueprint.errorhandler(IntegrityError)
def handle_integrity_error(exc):
@@ -518,11 +537,150 @@ def get_service_history(service_id):
@notify_celery.task(name="generate-notifications-report")
def generate_notifications_report(service_id, report_id, data, request_method):
def generate_notifications_report(
service_id, report_id, data, request_method, request_args
):
current_app.logger.debug(hilite("ENTER generate_notifications_report"))
page = 1
page_size = 20000
limit_days = data.get("limit_days")
include_jobs = data.get("include_jobs", True)
include_from_test_key = data.get("include_from_test_key", False)
include_one_off = data.get("include_one_off", True)
current_app.logger.debug(hilite("GET ALL PARAMS"))
current_app.logger.debug(
f"get pagination with {service_id} service_id filters {data} \
limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}"
)
start_time = time.time()
current_app.logger.debug(f"Start report generation with page.size {page_size}")
pagination = notifications_dao.get_notifications_for_service(
service_id,
filter_dict=data,
page=page,
page_size=page_size,
count_pages=False,
limit_days=limit_days,
include_jobs=include_jobs,
include_from_test_key=include_from_test_key,
include_one_off=include_one_off,
)
current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}")
for notification in pagination.items:
if notification.job_id is not None:
current_app.logger.debug(
f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}"
)
notification.personalisation = get_personalisation_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
recipient = get_phone_number_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
notification.to = recipient
notification.normalised_to = recipient
else:
notification.to = ""
notification.normalised_to = ""
kwargs = request_args
kwargs["service_id"] = service_id
notifications = [
notification.serialize_for_csv() for notification in pagination.items
]
current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}"))
# We try and get the next page of results to work out if we need provide a pagination link to the next page
# in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
# call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but
# this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach
# doesn't do an additional query to count all the results of which there could be millions but instead only
# asks for a single extra page of results).
csv_bytes = io.BytesIO()
text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="")
writer = csv.writer(text_wrapper)
writer.writerows(notifications)
text_wrapper.flush()
csv_bytes.seek(0)
bucket_name, file_location, access_key, secret_key, region = get_csv_location(
service_id, report_id
)
if bucket_name == "":
exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
tier = os.getenv("NOTIFY_ENVIRONMENT")
raise Exception(
f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}"
)
current_app.logger.debug(f"UPLOADING THIS {csv_bytes}")
s3upload(
filedata=csv_bytes,
region=region,
bucket_name=bucket_name,
file_location=file_location,
access_key=access_key,
secret_key=secret_key,
)
current_app.logger.debug(hilite("FINITO"))
@service_blueprint.route(
"/<uuid:service_id>/notifications-report", methods=["GET", "POST"]
)
def get_notifications_report_for_service(service_id):
current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@"))
check_suspicious_id(service_id)
report_id = str(uuid.uuid4())
request_method = request.method
if request_method == "GET":
data = notifications_filter_schema.load(request.args)
else:
data = notifications_filter_schema.load(
MultiDict(request.get_json(silent=True))
)
request_args = request.args.to_dict()
current_app.logger.debug(hilite("INVOKE APPLY_ASYNC"))
generate_notifications_report.apply_async(
args=[service_id, report_id, data, request_method, request_args],
queue=QueueNames.NOTIFY,
)
return jsonify({"report_id": report_id}), 200
@service_blueprint.route("/<uuid:service_id>/notifications", methods=["GET", "POST"])
def get_all_notifications_for_service(service_id):
check_suspicious_id(service_id)
current_app.logger.debug("enter get_all_notifications_for_service")
if request.method == "GET":
data = notifications_filter_schema.load(request.args)
current_app.logger.debug(
f"use GET, request.args {request.args} and data {data}"
)
elif request.method == "POST":
# Must transform request.get_json() to MultiDict as NotificationsFilterSchema expects a MultiDict.
# Unlike request.args, request.get_json() does not return a MultiDict but instead just a dict.
data = notifications_filter_schema.load(MultiDict(request.get_json()))
current_app.logger.debug(f"use POST, request {request.get_json()} data {data}")
page = data["page"] if "page" in data else 1
page_size = data["page_size"] if "page_size" in data else 100
current_app.logger.debug(hilite("PAGE SIZE 100"))
page_size = (
data["page_size"]
if "page_size" in data
else current_app.config.get("PAGE_SIZE")
)
# HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big)
# Tests are relying on the value in config (20), whereas the UI seems to pass 10000
if page_size > 100:
@@ -535,7 +693,7 @@ def generate_notifications_report(service_id, report_id, data, request_method):
# count_pages is not being used for whether to count the number of pages, but instead as a flag
# for whether to show pagination links
count_pages = data.get("count_pages", True)
current_app.logger.debug(hilite("GET ALL PARAMS"))
current_app.logger.debug(
f"get pagination with {service_id} service_id filters {data} \
limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}"
@@ -582,8 +740,7 @@ def generate_notifications_report(service_id, report_id, data, request_method):
kwargs = request.args.to_dict()
kwargs["service_id"] = service_id
if True:
# if data.get("format_for_csv"):
if data.get("format_for_csv"):
notifications = [
notification.serialize_for_csv() for notification in pagination.items
]
@@ -591,9 +748,7 @@ def generate_notifications_report(service_id, report_id, data, request_method):
notifications = notification_with_template_schema.dump(
pagination.items, many=True
)
current_app.logger.debug(
hilite(f"number of notifications are {len(notifications)}")
)
current_app.logger.debug(f"number of notifications are {len(notifications)}")
# We try and get the next page of results to work out if we need provide a pagination link to the next page
# in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
@@ -601,7 +756,6 @@ def generate_notifications_report(service_id, report_id, data, request_method):
# this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach
# doesn't do an additional query to count all the results of which there could be millions but instead only
# asks for a single extra page of results).
current_app.logger.debug(hilite("ABOUT TO CALL NOTIFICATIONS DAO"))
next_page_of_pagination = notifications_dao.get_notifications_for_service(
service_id,
filter_dict=data,
@@ -614,9 +768,8 @@ def generate_notifications_report(service_id, report_id, data, request_method):
include_one_off=include_one_off,
error_out=False, # False so that if there are no results, it doesn't end in aborting with a 404
)
current_app.logger.debug(hilite("NOTIFICATIONS_DAO CALLED SUCCESSFULLY"))
x = (
return (
jsonify(
notifications=notifications,
page_size=page_size,
@@ -633,26 +786,6 @@ def generate_notifications_report(service_id, report_id, data, request_method):
),
200,
)
current_app.logger.debug(hilite(x))
@service_blueprint.route("/<uuid:service_id>/notifications", methods=["GET", "POST"])
def get_all_notifications_for_service(service_id):
current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@"))
check_suspicious_id(service_id)
report_id = str(uuid.uuid4())
request_method = request.method
if request_method == "GET":
data = notifications_filter_schema.load(request.args)
else:
data = notifications_filter_schema.load(
MultiDict(request.get_json(silent=True))
)
current_app.logger.debug(hilite("INVOKE APPLY_ASYNC"))
generate_notifications_report.apply_async(
args=[service_id, report_id, data, request_method], queue=QueueNames.NOTIFY
)
return jsonify({"report_id": report_id}), 200
@service_blueprint.route(