From d53e033d2af5539449759b58d355cda66e6fe8fb Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 5 Aug 2025 13:41:57 -0700 Subject: [PATCH] add task and command --- app/celery/tasks.py | 245 +++++++++++++++++++++++--------------------- app/commands.py | 10 +- app/config.py | 5 + app/service/rest.py | 146 +------------------------- 4 files changed, 144 insertions(+), 262 deletions(-) diff --git a/app/celery/tasks.py b/app/celery/tasks.py index d31232155..b665f2d09 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -1,4 +1,8 @@ +import csv +import io import json +import os +import uuid import gevent from celery.signals import task_postrun @@ -10,6 +14,7 @@ from app import create_uuid, encryption, notify_celery from app.aws import s3 from app.celery import provider_tasks from app.config import Config, QueueNames +from app.dao import notifications_dao from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job from app.dao.notifications_dao import ( @@ -19,7 +24,7 @@ from app.dao.notifications_dao import ( from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id -from app.dao.services_dao import dao_fetch_service_by_id +from app.dao.services_dao import dao_fetch_all_services, dao_fetch_service_by_id from app.dao.templates_dao import dao_get_template_by_id from app.enums import JobStatus, KeyType, NotificationType from app.errors import TotalRequestsError @@ -32,122 +37,7 @@ from app.serialised_models import SerialisedService, SerialisedTemplate from app.service.utils import service_allowed_to_send_to from app.utils import DATETIME_FORMAT, hilite, utc_now from notifications_utils.recipients import RecipientCSV - -# @notify_celery.task(name="process-job") -# def generate_notifications_report(service_id, report_id, data, request_method): -# current_app.logger.debug(hilite("ENTER generate_notifications_report")) -# page = data["page"] if "page" in data else 1 -# page_size = ( -# data["page_size"] -# if "page_size" in data -# else current_app.config.get("PAGE_SIZE") -# ) -# # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big) -# # Tests are relying on the value in config (20), whereas the UI seems to pass 10000 -# if page_size > 100: -# page_size = 100 -# limit_days = data.get("limit_days") -# include_jobs = data.get("include_jobs", True) -# include_from_test_key = data.get("include_from_test_key", False) -# include_one_off = data.get("include_one_off", True) - -# # count_pages is not being used for whether to count the number of pages, but instead as a flag -# # for whether to show pagination links -# count_pages = data.get("count_pages", True) - -# current_app.logger.debug( -# f"get pagination with {service_id} service_id filters {data} \ -# limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" -# ) -# start_time = time.time() -# current_app.logger.debug(f"Start report generation with page.size {page_size}") -# pagination = notifications_dao.get_notifications_for_service( -# service_id, -# filter_dict=data, -# page=page, -# page_size=page_size, -# count_pages=False, -# limit_days=limit_days, -# include_jobs=include_jobs, -# include_from_test_key=include_from_test_key, -# include_one_off=include_one_off, -# ) -# current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}") - -# for notification in pagination.items: -# if notification.job_id is not None: -# current_app.logger.debug( -# f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}" -# ) -# notification.personalisation = get_personalisation_from_s3( -# notification.service_id, -# notification.job_id, -# notification.job_row_number, -# ) - -# recipient = get_phone_number_from_s3( -# notification.service_id, -# notification.job_id, -# notification.job_row_number, -# ) - -# notification.to = recipient -# notification.normalised_to = recipient - -# else: -# notification.to = "" -# notification.normalised_to = "" - -# kwargs = request.args.to_dict() -# kwargs["service_id"] = service_id - -# if data.get("format_for_csv"): -# notifications = [ -# notification.serialize_for_csv() for notification in pagination.items -# ] -# else: -# notifications = notification_with_template_schema.dump( -# pagination.items, many=True -# ) -# current_app.logger.debug(f"number of notifications are {len(notifications)}") - -# # We try and get the next page of results to work out if we need provide a pagination link to the next page -# # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous -# # call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but -# # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach -# # doesn't do an additional query to count all the results of which there could be millions but instead only -# # asks for a single extra page of results). -# next_page_of_pagination = notifications_dao.get_notifications_for_service( -# service_id, -# filter_dict=data, -# page=page + 1, -# page_size=page_size, -# count_pages=False, -# limit_days=limit_days, -# include_jobs=include_jobs, -# include_from_test_key=include_from_test_key, -# include_one_off=include_one_off, -# error_out=False, # False so that if there are no results, it doesn't end in aborting with a 404 -# ) - -# x = ( -# jsonify( -# notifications=notifications, -# page_size=page_size, -# links=( -# get_prev_next_pagination_links( -# page, -# len(next_page_of_pagination.items), -# ".get_all_notifications_for_service", -# **kwargs, -# ) -# if count_pages -# else {} -# ), -# ), -# 200, -# ) -# current_app.logger.debug(x) +from notifications_utils.s3 import s3upload @notify_celery.task(name="process-job") @@ -662,3 +552,124 @@ def process_incomplete_job(job_id): process_row(row, template, job, job.service, sender_id=sender_id) job_complete(job, resumed=True) + + +def _generate_notifications_report(service_id, report_id, limit_days): + current_app.logger.debug(hilite("ENTER _generate_notifications_report()")) + page = 1 + page_size = 20000 + include_jobs = True + include_from_test_key = False + include_one_off = True + + data = { + "limit_days": limit_days, + "include_jobs": True, + "include_from_test_key": False, + "include_one_off": True, + } + pagination = notifications_dao.get_notifications_for_service( + service_id, + filter_dict=data, + page=page, + page_size=page_size, + count_pages=False, + limit_days=limit_days, + include_jobs=include_jobs, + include_from_test_key=include_from_test_key, + include_one_off=include_one_off, + ) + count = 1 + for notification in pagination.items: + if notification.job_id is not None: + current_app.logger.debug( + f"Processing job_id {notification.job_id} which is row {count}" + ) + count = count + 1 + notification.personalisation = s3.get_personalisation_from_s3( + notification.service_id, + notification.job_id, + notification.job_row_number, + ) + + recipient = s3.get_phone_number_from_s3( + notification.service_id, + notification.job_id, + notification.job_row_number, + ) + + notification.to = recipient + notification.normalised_to = recipient + + else: + notification.to = "" + notification.normalised_to = "" + + notifications = [ + notification.serialize_for_csv() for notification in pagination.items + ] + current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}")) + + # We try and get the next page of results to work out if we need provide a pagination link to the next page + # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous + # call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but + # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach + # doesn't do an additional query to count all the results of which there could be millions but instead only + # asks for a single extra page of results). + + csv_bytes = io.BytesIO() + text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="") + writer = csv.writer(text_wrapper) + writer.writerows(notifications) + text_wrapper.flush() + csv_bytes.seek(0) + + bucket_name, file_location, access_key, secret_key, region = get_csv_location( + service_id, report_id + ) + if bucket_name == "": + exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + tier = os.getenv("NOTIFY_ENVIRONMENT") + raise Exception( + f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}" + ) + + current_app.logger.debug(hilite(f"UPLOADING THIS {file_location}")) + s3upload( + filedata=csv_bytes, + region=region, + bucket_name=bucket_name, + file_location=file_location, + access_key=access_key, + secret_key=secret_key, + ) + current_app.logger.debug(hilite("FINITO")) + + +@notify_celery.task(name="generate-notifications-reports") +def generate_notification_reports_task(): + current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@")) + services = dao_fetch_all_services(only_active=True) + for service in services: + + current_app.logger.debug(hilite("INVOKE APPLY_ASYNC")) + limit_days = [1, 3, 5, 7] + for limit_day in limit_days: + + report_id = f"{str(uuid.uuid4())}-{limit_day}-day" + _generate_notifications_report(service.id, report_id, limit_day) + current_app.logger.info("Notifications report generation complete") + + +NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv" + + +def get_csv_location(service_id, upload_id): + return ( + current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], + NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id), + current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"], + current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"], + current_app.config["CSV_UPLOAD_BUCKET"]["region"], + ) diff --git a/app/commands.py b/app/commands.py index e4fdc7cb5..020b0610d 100644 --- a/app/commands.py +++ b/app/commands.py @@ -18,7 +18,10 @@ from sqlalchemy.orm.exc import NoResultFound from app import db, redis_store from app.aws import s3 from app.celery.nightly_tasks import cleanup_unfinished_jobs -from app.celery.tasks import process_row +from app.celery.tasks import ( + generate_notification_reports_task, + process_row, +) from app.dao.annual_billing_dao import ( dao_create_or_update_annual_billing_for_year, set_default_free_allowance_for_service, @@ -810,6 +813,11 @@ def update_templates(): _clear_templates_from_cache() +@notify_command(name="generate-notification-reports") +def generate_notification_reports(): + generate_notification_reports_task() + + def _clear_templates_from_cache(): # When we update-templates in the db, we need to make sure to delete them # from redis, otherwise the old versions will stick around forever. diff --git a/app/config.py b/app/config.py index b203fe4f9..16267c63e 100644 --- a/app/config.py +++ b/app/config.py @@ -287,6 +287,11 @@ class Config(object): "schedule": crontab(minute="*/30"), "options": {"queue": QueueNames.PERIODIC}, }, + "generate-notifications-reports": { + "task": "generate-notifications-reports", + "schedule": crontab(hour=1, minute=0), + "options": {"queue": QueueNames.PERIODIC}, + }, "regenerate-job-cache-on-startup": { "task": "regenerate-job-cache", "schedule": crontab( diff --git a/app/service/rest.py b/app/service/rest.py index c4d94e0d1..aa1d89be5 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -1,10 +1,6 @@ -import csv -import io import itertools import logging -import os import time -import uuid from datetime import datetime, timedelta from zoneinfo import ZoneInfo @@ -15,7 +11,7 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.exc import NoResultFound from werkzeug.datastructures import MultiDict -from app import db, notify_celery +from app import db from app.aws.s3 import get_personalisation_from_s3, get_phone_number_from_s3 from app.config import QueueNames from app.dao import fact_notification_status_dao, notifications_dao @@ -124,10 +120,8 @@ from app.user.users_schema import post_set_permissions_schema from app.utils import ( check_suspicious_id, get_prev_next_pagination_links, - hilite, utc_now, ) -from notifications_utils.s3 import s3upload celery_logger = logging.getLogger(__name__) @@ -135,18 +129,6 @@ service_blueprint = Blueprint("service", __name__) register_errors(service_blueprint) -NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv" - - -def get_csv_location(service_id, upload_id): - return ( - current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], - NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id), - current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"], - current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"], - current_app.config["CSV_UPLOAD_BUCKET"]["region"], - ) - @service_blueprint.errorhandler(IntegrityError) def handle_integrity_error(exc): @@ -536,131 +518,7 @@ def get_service_history(service_id): return jsonify(data=data) -@notify_celery.task(name="generate-notifications-report") -def generate_notifications_report( - service_id, report_id, data, request_method, request_args -): - current_app.logger.debug(hilite("ENTER generate_notifications_report")) - page = 1 - page_size = 20000 - limit_days = data.get("limit_days") - include_jobs = data.get("include_jobs", True) - include_from_test_key = data.get("include_from_test_key", False) - include_one_off = data.get("include_one_off", True) - - current_app.logger.debug(hilite("GET ALL PARAMS")) - current_app.logger.debug( - f"get pagination with {service_id} service_id filters {data} \ - limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" - ) - start_time = time.time() - current_app.logger.debug(f"Start report generation with page.size {page_size}") - pagination = notifications_dao.get_notifications_for_service( - service_id, - filter_dict=data, - page=page, - page_size=page_size, - count_pages=False, - limit_days=limit_days, - include_jobs=include_jobs, - include_from_test_key=include_from_test_key, - include_one_off=include_one_off, - ) - current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}") - current_app.logger.debug(f"HOW MANY ITEMS IN PAGINATION? {len(pagination.items)}") - count = 1 - for notification in pagination.items: - if notification.job_id is not None: - current_app.logger.debug( - f"Processing job_id {notification.job_id} which is row {count}" - ) - count = count + 1 - notification.personalisation = get_personalisation_from_s3( - notification.service_id, - notification.job_id, - notification.job_row_number, - ) - - recipient = get_phone_number_from_s3( - notification.service_id, - notification.job_id, - notification.job_row_number, - ) - - notification.to = recipient - notification.normalised_to = recipient - - else: - notification.to = "" - notification.normalised_to = "" - - kwargs = request_args - kwargs["service_id"] = service_id - - notifications = [ - notification.serialize_for_csv() for notification in pagination.items - ] - current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}")) - - # We try and get the next page of results to work out if we need provide a pagination link to the next page - # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous - # call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but - # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach - # doesn't do an additional query to count all the results of which there could be millions but instead only - # asks for a single extra page of results). - - csv_bytes = io.BytesIO() - text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="") - writer = csv.writer(text_wrapper) - writer.writerows(notifications) - text_wrapper.flush() - csv_bytes.seek(0) - - bucket_name, file_location, access_key, secret_key, region = get_csv_location( - service_id, report_id - ) - if bucket_name == "": - exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] - exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] - tier = os.getenv("NOTIFY_ENVIRONMENT") - raise Exception( - f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}" - ) - - current_app.logger.debug(f"UPLOADING THIS {csv_bytes}") - s3upload( - filedata=csv_bytes, - region=region, - bucket_name=bucket_name, - file_location=file_location, - access_key=access_key, - secret_key=secret_key, - ) - current_app.logger.debug(hilite("FINITO")) - - -@service_blueprint.route("//notifications-report", methods=["GET", "POST"]) -def get_notifications_report_for_service(service_id): - current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@")) - check_suspicious_id(service_id) - report_id = str(uuid.uuid4()) - request_method = request.method - if request_method == "GET": - data = notifications_filter_schema.load(request.args) - else: - data = notifications_filter_schema.load( - MultiDict(request.get_json(silent=True)) - ) - request_args = request.args.to_dict() - current_app.logger.debug(hilite("INVOKE APPLY_ASYNC")) - generate_notifications_report.apply_async( - args=[service_id, report_id, data, request_method, request_args], - queue=QueueNames.NOTIFY, - ) - return jsonify({"report_id": report_id}), 200 - - -@service_blueprint.route("//notifications", methods=["GET", "POST"]) +@service_blueprint.route("//notificationsx", methods=["GET", "POST"]) def get_all_notifications_for_service(service_id): check_suspicious_id(service_id) current_app.logger.debug("enter get_all_notifications_for_service")