diff --git a/app/__init__.py b/app/__init__.py index 05ac4c0e7..598c44313 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -480,8 +480,7 @@ def make_task(app): ) ) - def on_failure(self, exc, task_id, args, kwargs, einfo): - current_app.logger.debug(f"Using {einfo}") + def on_failure(self, exc, task_id, args, kwargs, einfo=None): # enables request id tracing for these logs with self.app_context(): diff --git a/app/service/rest.py b/app/service/rest.py index e1ed62ed8..87394fe3c 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -1,4 +1,8 @@ +import csv +import io import itertools +import logging +import os import time import uuid from datetime import datetime, timedelta @@ -123,11 +127,26 @@ from app.utils import ( hilite, utc_now, ) +from notifications_utils.s3 import s3upload + +celery_logger = logging.getLogger(__name__) service_blueprint = Blueprint("service", __name__) register_errors(service_blueprint) +NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv" + + +def get_csv_location(service_id, upload_id): + return ( + current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], + NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id), + current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"], + current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"], + current_app.config["CSV_UPLOAD_BUCKET"]["region"], + ) + @service_blueprint.errorhandler(IntegrityError) def handle_integrity_error(exc): @@ -518,11 +537,150 @@ def get_service_history(service_id): @notify_celery.task(name="generate-notifications-report") -def generate_notifications_report(service_id, report_id, data, request_method): +def generate_notifications_report( + service_id, report_id, data, request_method, request_args +): current_app.logger.debug(hilite("ENTER generate_notifications_report")) + page = 1 + page_size = 20000 + limit_days = data.get("limit_days") + include_jobs = data.get("include_jobs", True) + include_from_test_key = data.get("include_from_test_key", False) + include_one_off = data.get("include_one_off", True) + + current_app.logger.debug(hilite("GET ALL PARAMS")) + current_app.logger.debug( + f"get pagination with {service_id} service_id filters {data} \ + limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" + ) + start_time = time.time() + current_app.logger.debug(f"Start report generation with page.size {page_size}") + pagination = notifications_dao.get_notifications_for_service( + service_id, + filter_dict=data, + page=page, + page_size=page_size, + count_pages=False, + limit_days=limit_days, + include_jobs=include_jobs, + include_from_test_key=include_from_test_key, + include_one_off=include_one_off, + ) + current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}") + + for notification in pagination.items: + if notification.job_id is not None: + current_app.logger.debug( + f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}" + ) + notification.personalisation = get_personalisation_from_s3( + notification.service_id, + notification.job_id, + notification.job_row_number, + ) + + recipient = get_phone_number_from_s3( + notification.service_id, + notification.job_id, + notification.job_row_number, + ) + + notification.to = recipient + notification.normalised_to = recipient + + else: + notification.to = "" + notification.normalised_to = "" + + kwargs = request_args + kwargs["service_id"] = service_id + + notifications = [ + notification.serialize_for_csv() for notification in pagination.items + ] + current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}")) + + # We try and get the next page of results to work out if we need provide a pagination link to the next page + # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous + # call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but + # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach + # doesn't do an additional query to count all the results of which there could be millions but instead only + # asks for a single extra page of results). + + csv_bytes = io.BytesIO() + text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="") + writer = csv.writer(text_wrapper) + writer.writerows(notifications) + text_wrapper.flush() + csv_bytes.seek(0) + + bucket_name, file_location, access_key, secret_key, region = get_csv_location( + service_id, report_id + ) + if bucket_name == "": + exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + tier = os.getenv("NOTIFY_ENVIRONMENT") + raise Exception( + f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}" + ) + + current_app.logger.debug(f"UPLOADING THIS {csv_bytes}") + s3upload( + filedata=csv_bytes, + region=region, + bucket_name=bucket_name, + file_location=file_location, + access_key=access_key, + secret_key=secret_key, + ) + current_app.logger.debug(hilite("FINITO")) + + +@service_blueprint.route( + "//notifications-report", methods=["GET", "POST"] +) +def get_notifications_report_for_service(service_id): + current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@")) + check_suspicious_id(service_id) + report_id = str(uuid.uuid4()) + request_method = request.method + if request_method == "GET": + data = notifications_filter_schema.load(request.args) + else: + data = notifications_filter_schema.load( + MultiDict(request.get_json(silent=True)) + ) + request_args = request.args.to_dict() + current_app.logger.debug(hilite("INVOKE APPLY_ASYNC")) + generate_notifications_report.apply_async( + args=[service_id, report_id, data, request_method, request_args], + queue=QueueNames.NOTIFY, + ) + return jsonify({"report_id": report_id}), 200 + + +@service_blueprint.route("//notifications", methods=["GET", "POST"]) +def get_all_notifications_for_service(service_id): + check_suspicious_id(service_id) + current_app.logger.debug("enter get_all_notifications_for_service") + if request.method == "GET": + data = notifications_filter_schema.load(request.args) + current_app.logger.debug( + f"use GET, request.args {request.args} and data {data}" + ) + elif request.method == "POST": + # Must transform request.get_json() to MultiDict as NotificationsFilterSchema expects a MultiDict. + # Unlike request.args, request.get_json() does not return a MultiDict but instead just a dict. + data = notifications_filter_schema.load(MultiDict(request.get_json())) + current_app.logger.debug(f"use POST, request {request.get_json()} data {data}") + page = data["page"] if "page" in data else 1 - page_size = data["page_size"] if "page_size" in data else 100 - current_app.logger.debug(hilite("PAGE SIZE 100")) + page_size = ( + data["page_size"] + if "page_size" in data + else current_app.config.get("PAGE_SIZE") + ) # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big) # Tests are relying on the value in config (20), whereas the UI seems to pass 10000 if page_size > 100: @@ -535,7 +693,7 @@ def generate_notifications_report(service_id, report_id, data, request_method): # count_pages is not being used for whether to count the number of pages, but instead as a flag # for whether to show pagination links count_pages = data.get("count_pages", True) - current_app.logger.debug(hilite("GET ALL PARAMS")) + current_app.logger.debug( f"get pagination with {service_id} service_id filters {data} \ limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" @@ -582,8 +740,7 @@ def generate_notifications_report(service_id, report_id, data, request_method): kwargs = request.args.to_dict() kwargs["service_id"] = service_id - if True: - # if data.get("format_for_csv"): + if data.get("format_for_csv"): notifications = [ notification.serialize_for_csv() for notification in pagination.items ] @@ -591,9 +748,7 @@ def generate_notifications_report(service_id, report_id, data, request_method): notifications = notification_with_template_schema.dump( pagination.items, many=True ) - current_app.logger.debug( - hilite(f"number of notifications are {len(notifications)}") - ) + current_app.logger.debug(f"number of notifications are {len(notifications)}") # We try and get the next page of results to work out if we need provide a pagination link to the next page # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous @@ -601,7 +756,6 @@ def generate_notifications_report(service_id, report_id, data, request_method): # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach # doesn't do an additional query to count all the results of which there could be millions but instead only # asks for a single extra page of results). - current_app.logger.debug(hilite("ABOUT TO CALL NOTIFICATIONS DAO")) next_page_of_pagination = notifications_dao.get_notifications_for_service( service_id, filter_dict=data, @@ -614,9 +768,8 @@ def generate_notifications_report(service_id, report_id, data, request_method): include_one_off=include_one_off, error_out=False, # False so that if there are no results, it doesn't end in aborting with a 404 ) - current_app.logger.debug(hilite("NOTIFICATIONS_DAO CALLED SUCCESSFULLY")) - x = ( + return ( jsonify( notifications=notifications, page_size=page_size, @@ -633,26 +786,6 @@ def generate_notifications_report(service_id, report_id, data, request_method): ), 200, ) - current_app.logger.debug(hilite(x)) - - -@service_blueprint.route("//notifications", methods=["GET", "POST"]) -def get_all_notifications_for_service(service_id): - current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@")) - check_suspicious_id(service_id) - report_id = str(uuid.uuid4()) - request_method = request.method - if request_method == "GET": - data = notifications_filter_schema.load(request.args) - else: - data = notifications_filter_schema.load( - MultiDict(request.get_json(silent=True)) - ) - current_app.logger.debug(hilite("INVOKE APPLY_ASYNC")) - generate_notifications_report.apply_async( - args=[service_id, report_id, data, request_method], queue=QueueNames.NOTIFY - ) - return jsonify({"report_id": report_id}), 200 @service_blueprint.route(