add task and command

This commit is contained in:
Kenneth Kehl
2025-08-05 13:41:57 -07:00
parent bb68370eae
commit d53e033d2a
4 changed files with 144 additions and 262 deletions

View File

@@ -1,4 +1,8 @@
import csv
import io
import json
import os
import uuid
import gevent
from celery.signals import task_postrun
@@ -10,6 +14,7 @@ from app import create_uuid, encryption, notify_celery
from app.aws import s3
from app.celery import provider_tasks
from app.config import Config, QueueNames
from app.dao import notifications_dao
from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id
from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job
from app.dao.notifications_dao import (
@@ -19,7 +24,7 @@ from app.dao.notifications_dao import (
from app.dao.service_email_reply_to_dao import dao_get_reply_to_by_id
from app.dao.service_inbound_api_dao import get_service_inbound_api_for_service
from app.dao.service_sms_sender_dao import dao_get_service_sms_senders_by_id
from app.dao.services_dao import dao_fetch_service_by_id
from app.dao.services_dao import dao_fetch_all_services, dao_fetch_service_by_id
from app.dao.templates_dao import dao_get_template_by_id
from app.enums import JobStatus, KeyType, NotificationType
from app.errors import TotalRequestsError
@@ -32,122 +37,7 @@ from app.serialised_models import SerialisedService, SerialisedTemplate
from app.service.utils import service_allowed_to_send_to
from app.utils import DATETIME_FORMAT, hilite, utc_now
from notifications_utils.recipients import RecipientCSV
# @notify_celery.task(name="process-job")
# def generate_notifications_report(service_id, report_id, data, request_method):
# current_app.logger.debug(hilite("ENTER generate_notifications_report"))
# page = data["page"] if "page" in data else 1
# page_size = (
# data["page_size"]
# if "page_size" in data
# else current_app.config.get("PAGE_SIZE")
# )
# # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big)
# # Tests are relying on the value in config (20), whereas the UI seems to pass 10000
# if page_size > 100:
# page_size = 100
# limit_days = data.get("limit_days")
# include_jobs = data.get("include_jobs", True)
# include_from_test_key = data.get("include_from_test_key", False)
# include_one_off = data.get("include_one_off", True)
# # count_pages is not being used for whether to count the number of pages, but instead as a flag
# # for whether to show pagination links
# count_pages = data.get("count_pages", True)
# current_app.logger.debug(
# f"get pagination with {service_id} service_id filters {data} \
# limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}"
# )
# start_time = time.time()
# current_app.logger.debug(f"Start report generation with page.size {page_size}")
# pagination = notifications_dao.get_notifications_for_service(
# service_id,
# filter_dict=data,
# page=page,
# page_size=page_size,
# count_pages=False,
# limit_days=limit_days,
# include_jobs=include_jobs,
# include_from_test_key=include_from_test_key,
# include_one_off=include_one_off,
# )
# current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}")
# for notification in pagination.items:
# if notification.job_id is not None:
# current_app.logger.debug(
# f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}"
# )
# notification.personalisation = get_personalisation_from_s3(
# notification.service_id,
# notification.job_id,
# notification.job_row_number,
# )
# recipient = get_phone_number_from_s3(
# notification.service_id,
# notification.job_id,
# notification.job_row_number,
# )
# notification.to = recipient
# notification.normalised_to = recipient
# else:
# notification.to = ""
# notification.normalised_to = ""
# kwargs = request.args.to_dict()
# kwargs["service_id"] = service_id
# if data.get("format_for_csv"):
# notifications = [
# notification.serialize_for_csv() for notification in pagination.items
# ]
# else:
# notifications = notification_with_template_schema.dump(
# pagination.items, many=True
# )
# current_app.logger.debug(f"number of notifications are {len(notifications)}")
# # We try and get the next page of results to work out if we need provide a pagination link to the next page
# # in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
# # call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but
# # this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach
# # doesn't do an additional query to count all the results of which there could be millions but instead only
# # asks for a single extra page of results).
# next_page_of_pagination = notifications_dao.get_notifications_for_service(
# service_id,
# filter_dict=data,
# page=page + 1,
# page_size=page_size,
# count_pages=False,
# limit_days=limit_days,
# include_jobs=include_jobs,
# include_from_test_key=include_from_test_key,
# include_one_off=include_one_off,
# error_out=False, # False so that if there are no results, it doesn't end in aborting with a 404
# )
# x = (
# jsonify(
# notifications=notifications,
# page_size=page_size,
# links=(
# get_prev_next_pagination_links(
# page,
# len(next_page_of_pagination.items),
# ".get_all_notifications_for_service",
# **kwargs,
# )
# if count_pages
# else {}
# ),
# ),
# 200,
# )
# current_app.logger.debug(x)
from notifications_utils.s3 import s3upload
@notify_celery.task(name="process-job")
@@ -662,3 +552,124 @@ def process_incomplete_job(job_id):
process_row(row, template, job, job.service, sender_id=sender_id)
job_complete(job, resumed=True)
def _generate_notifications_report(service_id, report_id, limit_days):
current_app.logger.debug(hilite("ENTER _generate_notifications_report()"))
page = 1
page_size = 20000
include_jobs = True
include_from_test_key = False
include_one_off = True
data = {
"limit_days": limit_days,
"include_jobs": True,
"include_from_test_key": False,
"include_one_off": True,
}
pagination = notifications_dao.get_notifications_for_service(
service_id,
filter_dict=data,
page=page,
page_size=page_size,
count_pages=False,
limit_days=limit_days,
include_jobs=include_jobs,
include_from_test_key=include_from_test_key,
include_one_off=include_one_off,
)
count = 1
for notification in pagination.items:
if notification.job_id is not None:
current_app.logger.debug(
f"Processing job_id {notification.job_id} which is row {count}"
)
count = count + 1
notification.personalisation = s3.get_personalisation_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
recipient = s3.get_phone_number_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
notification.to = recipient
notification.normalised_to = recipient
else:
notification.to = ""
notification.normalised_to = ""
notifications = [
notification.serialize_for_csv() for notification in pagination.items
]
current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}"))
# We try and get the next page of results to work out if we need provide a pagination link to the next page
# in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
# call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but
# this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach
# doesn't do an additional query to count all the results of which there could be millions but instead only
# asks for a single extra page of results).
csv_bytes = io.BytesIO()
text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="")
writer = csv.writer(text_wrapper)
writer.writerows(notifications)
text_wrapper.flush()
csv_bytes.seek(0)
bucket_name, file_location, access_key, secret_key, region = get_csv_location(
service_id, report_id
)
if bucket_name == "":
exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
tier = os.getenv("NOTIFY_ENVIRONMENT")
raise Exception(
f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}"
)
current_app.logger.debug(hilite(f"UPLOADING THIS {file_location}"))
s3upload(
filedata=csv_bytes,
region=region,
bucket_name=bucket_name,
file_location=file_location,
access_key=access_key,
secret_key=secret_key,
)
current_app.logger.debug(hilite("FINITO"))
@notify_celery.task(name="generate-notifications-reports")
def generate_notification_reports_task():
current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@"))
services = dao_fetch_all_services(only_active=True)
for service in services:
current_app.logger.debug(hilite("INVOKE APPLY_ASYNC"))
limit_days = [1, 3, 5, 7]
for limit_day in limit_days:
report_id = f"{str(uuid.uuid4())}-{limit_day}-day"
_generate_notifications_report(service.id, report_id, limit_day)
current_app.logger.info("Notifications report generation complete")
NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv"
def get_csv_location(service_id, upload_id):
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id),
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)

View File

@@ -18,7 +18,10 @@ from sqlalchemy.orm.exc import NoResultFound
from app import db, redis_store
from app.aws import s3
from app.celery.nightly_tasks import cleanup_unfinished_jobs
from app.celery.tasks import process_row
from app.celery.tasks import (
generate_notification_reports_task,
process_row,
)
from app.dao.annual_billing_dao import (
dao_create_or_update_annual_billing_for_year,
set_default_free_allowance_for_service,
@@ -810,6 +813,11 @@ def update_templates():
_clear_templates_from_cache()
@notify_command(name="generate-notification-reports")
def generate_notification_reports():
generate_notification_reports_task()
def _clear_templates_from_cache():
# When we update-templates in the db, we need to make sure to delete them
# from redis, otherwise the old versions will stick around forever.

View File

@@ -287,6 +287,11 @@ class Config(object):
"schedule": crontab(minute="*/30"),
"options": {"queue": QueueNames.PERIODIC},
},
"generate-notifications-reports": {
"task": "generate-notifications-reports",
"schedule": crontab(hour=1, minute=0),
"options": {"queue": QueueNames.PERIODIC},
},
"regenerate-job-cache-on-startup": {
"task": "regenerate-job-cache",
"schedule": crontab(

View File

@@ -1,10 +1,6 @@
import csv
import io
import itertools
import logging
import os
import time
import uuid
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
@@ -15,7 +11,7 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.datastructures import MultiDict
from app import db, notify_celery
from app import db
from app.aws.s3 import get_personalisation_from_s3, get_phone_number_from_s3
from app.config import QueueNames
from app.dao import fact_notification_status_dao, notifications_dao
@@ -124,10 +120,8 @@ from app.user.users_schema import post_set_permissions_schema
from app.utils import (
check_suspicious_id,
get_prev_next_pagination_links,
hilite,
utc_now,
)
from notifications_utils.s3 import s3upload
celery_logger = logging.getLogger(__name__)
@@ -135,18 +129,6 @@ service_blueprint = Blueprint("service", __name__)
register_errors(service_blueprint)
NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv"
def get_csv_location(service_id, upload_id):
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
NEW_FILE_LOCATION_STRUCTURE.format(service_id, upload_id),
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)
@service_blueprint.errorhandler(IntegrityError)
def handle_integrity_error(exc):
@@ -536,131 +518,7 @@ def get_service_history(service_id):
return jsonify(data=data)
@notify_celery.task(name="generate-notifications-report")
def generate_notifications_report(
service_id, report_id, data, request_method, request_args
):
current_app.logger.debug(hilite("ENTER generate_notifications_report"))
page = 1
page_size = 20000
limit_days = data.get("limit_days")
include_jobs = data.get("include_jobs", True)
include_from_test_key = data.get("include_from_test_key", False)
include_one_off = data.get("include_one_off", True)
current_app.logger.debug(hilite("GET ALL PARAMS"))
current_app.logger.debug(
f"get pagination with {service_id} service_id filters {data} \
limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}"
)
start_time = time.time()
current_app.logger.debug(f"Start report generation with page.size {page_size}")
pagination = notifications_dao.get_notifications_for_service(
service_id,
filter_dict=data,
page=page,
page_size=page_size,
count_pages=False,
limit_days=limit_days,
include_jobs=include_jobs,
include_from_test_key=include_from_test_key,
include_one_off=include_one_off,
)
current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}")
current_app.logger.debug(f"HOW MANY ITEMS IN PAGINATION? {len(pagination.items)}")
count = 1
for notification in pagination.items:
if notification.job_id is not None:
current_app.logger.debug(
f"Processing job_id {notification.job_id} which is row {count}"
)
count = count + 1
notification.personalisation = get_personalisation_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
recipient = get_phone_number_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
notification.to = recipient
notification.normalised_to = recipient
else:
notification.to = ""
notification.normalised_to = ""
kwargs = request_args
kwargs["service_id"] = service_id
notifications = [
notification.serialize_for_csv() for notification in pagination.items
]
current_app.logger.debug(hilite(f"NUMBER OF NOTFICIATIONS IS {len(notifications)}"))
# We try and get the next page of results to work out if we need provide a pagination link to the next page
# in our response if it exists. Note, this could be done instead by changing `count_pages` in the previous
# call to be True which will enable us to use Flask-Sqlalchemy to tell if there is a next page of results but
# this way is much more performant for services with many results (unlike Flask SqlAlchemy, this approach
# doesn't do an additional query to count all the results of which there could be millions but instead only
# asks for a single extra page of results).
csv_bytes = io.BytesIO()
text_wrapper = io.TextIOWrapper(csv_bytes, encoding="utf-8", newline="")
writer = csv.writer(text_wrapper)
writer.writerows(notifications)
text_wrapper.flush()
csv_bytes.seek(0)
bucket_name, file_location, access_key, secret_key, region = get_csv_location(
service_id, report_id
)
if bucket_name == "":
exp_bucket = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
exp_region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
tier = os.getenv("NOTIFY_ENVIRONMENT")
raise Exception(
f"NO BUCKET NAME SHOULD BE: {exp_bucket} WITH REGION {exp_region} TIER {tier}"
)
current_app.logger.debug(f"UPLOADING THIS {csv_bytes}")
s3upload(
filedata=csv_bytes,
region=region,
bucket_name=bucket_name,
file_location=file_location,
access_key=access_key,
secret_key=secret_key,
)
current_app.logger.debug(hilite("FINITO"))
@service_blueprint.route("/<uuid:service_id>/notifications-report", methods=["GET", "POST"])
def get_notifications_report_for_service(service_id):
current_app.logger.debug(hilite("ENTER GET ALL NOTIFICATIONS FOR SERVICE@"))
check_suspicious_id(service_id)
report_id = str(uuid.uuid4())
request_method = request.method
if request_method == "GET":
data = notifications_filter_schema.load(request.args)
else:
data = notifications_filter_schema.load(
MultiDict(request.get_json(silent=True))
)
request_args = request.args.to_dict()
current_app.logger.debug(hilite("INVOKE APPLY_ASYNC"))
generate_notifications_report.apply_async(
args=[service_id, report_id, data, request_method, request_args],
queue=QueueNames.NOTIFY,
)
return jsonify({"report_id": report_id}), 200
@service_blueprint.route("/<uuid:service_id>/notifications", methods=["GET", "POST"])
@service_blueprint.route("/<uuid:service_id>/notificationsx", methods=["GET", "POST"])
def get_all_notifications_for_service(service_id):
check_suspicious_id(service_id)
current_app.logger.debug("enter get_all_notifications_for_service")