From 2788c21420429e19c1f568c8a492a60a8d8c4a51 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 21 May 2025 13:44:14 -0700 Subject: [PATCH] change page size --- app/__init__.py | 8 ++++---- app/aws/s3.py | 47 ++++++++++++++++++++------------------------- app/service/rest.py | 9 +++++++++ 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 404322123..278d273ad 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -5,7 +5,7 @@ import string import time import uuid from contextlib import contextmanager -from multiprocessing import Manager +from threading import Lock from time import monotonic from celery import Celery, Task, current_task @@ -32,6 +32,9 @@ from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient +job_cache = {} +job_cache_lock = Lock() + class NotifyCelery(Celery): def init_app(self, app): @@ -152,9 +155,6 @@ def create_app(application): redis_store.init_app(application) document_download_client.init_app(application) - manager = Manager() - application.config["job_cache"] = manager.dict() - register_blueprint(application) # avoid circular imports by importing this file later diff --git a/app/aws/s3.py b/app/aws/s3.py index cc376ad24..734cd9513 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -9,6 +9,7 @@ import eventlet from boto3 import Session from flask import current_app +from app import job_cache, job_cache_lock from app.clients import AWS_CLIENT_CONFIG from notifications_utils import aware_utcnow @@ -32,30 +33,25 @@ def get_service_id_from_key(key): def set_job_cache(key, value): - current_app.logger.debug(f"Setting {key} in the job_cache to {value}.") - job_cache = current_app.config["job_cache"] - job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) + # current_app.logger.debug(f"Setting {key} in the job_cache to {value}.") + + with job_cache_lock: + job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) def get_job_cache(key): - job_cache = current_app.config["job_cache"] + ret = job_cache.get(key) - if ret is None: - current_app.logger.warning(f"Could not find {key} in the job_cache.") - else: - current_app.logger.debug(f"Got {key} from job_cache with value {ret}.") return ret def len_job_cache(): - job_cache = current_app.config["job_cache"] ret = len(job_cache) current_app.logger.debug(f"Length of job_cache is {ret}") return ret def clean_cache(): - job_cache = current_app.config["job_cache"] current_time = time.time() keys_to_delete = [] for key, (_, expiry_time) in job_cache.items(): @@ -65,8 +61,9 @@ def clean_cache(): current_app.logger.debug( f"Deleting the following keys from the job_cache: {keys_to_delete}" ) - for key in keys_to_delete: - del job_cache[key] + with job_cache_lock: + for key in keys_to_delete: + del job_cache[key] def get_s3_client(): @@ -207,9 +204,8 @@ def read_s3_file(bucket_name, object_key, s3res): extract_personalisation(job), ) - except LookupError: - # perhaps our key is not formatted as we expected. If so skip it. - current_app.logger.exception("LookupError #notify-debug-admin-1200") + except Exception: + current_app.logger.exception("Exception") def get_s3_files(): @@ -308,9 +304,7 @@ def file_exists(file_location): def get_job_location(service_id, job_id): - current_app.logger.debug( - f"#notify-debug-s3-partitioning NEW JOB_LOCATION: {NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id)}" - ) + return ( current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id), @@ -326,9 +320,7 @@ def get_old_job_location(service_id, job_id): but it will take a few days where we have to support both formats. Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE. """ - current_app.logger.debug( - f"#notify-debug-s3-partitioning OLD JOB LOCATION: {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}" - ) + return ( current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], FILE_LOCATION_STRUCTURE.format(service_id, job_id), @@ -467,7 +459,6 @@ def extract_personalisation(job): def get_phone_number_from_s3(service_id, job_id, job_row_number): job = get_job_cache(job_id) if job is None: - current_app.logger.debug(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors set_job_cache(job_id, job) @@ -481,8 +472,11 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): ) return "Unavailable" - phones = extract_phones(job, service_id, job_id) - set_job_cache(f"{job_id}_phones", phones) + phones = get_job_cache(f"{job_id}_phones") + if phones is None: + current_app.logger.debug("HAVE TO REEXTRACT PHONES!") + phones = extract_phones(job, service_id, job_id) + set_job_cache(f"{job_id}_phones", phones) # If we can find the quick dictionary, use it phone_to_return = phones[job_row_number] @@ -501,7 +495,6 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): # So this is a little recycling mechanism to reduce the number of downloads. job = get_job_cache(job_id) if job is None: - current_app.logger.debug(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors set_job_cache(job_id, job) @@ -519,7 +512,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) + personalisation = get_job_cache(f"{job_id}_personalisation") + if personalisation is None: + set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number) diff --git a/app/service/rest.py b/app/service/rest.py index 154ce3997..8177eba2f 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -1,4 +1,5 @@ import itertools +import time from datetime import datetime, timedelta from zoneinfo import ZoneInfo @@ -504,6 +505,8 @@ def get_all_notifications_for_service(service_id): if "page_size" in data else current_app.config.get("PAGE_SIZE") ) + # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big) + page_size = 100 limit_days = data.get("limit_days") include_jobs = data.get("include_jobs", True) include_from_test_key = data.get("include_from_test_key", False) @@ -517,6 +520,8 @@ def get_all_notifications_for_service(service_id): f"get pagination with {service_id} service_id filters {data} \ limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" ) + start_time = time.time() + current_app.logger.debug(f"Start report generation with page.size {page_size}") pagination = notifications_dao.get_notifications_for_service( service_id, filter_dict=data, @@ -528,9 +533,13 @@ def get_all_notifications_for_service(service_id): include_from_test_key=include_from_test_key, include_one_off=include_one_off, ) + current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}") for notification in pagination.items: if notification.job_id is not None: + current_app.logger.debug( + f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}" + ) notification.personalisation = get_personalisation_from_s3( notification.service_id, notification.job_id,