From 2788c21420429e19c1f568c8a492a60a8d8c4a51 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 21 May 2025 13:44:14 -0700 Subject: [PATCH 1/6] change page size --- app/__init__.py | 8 ++++---- app/aws/s3.py | 47 ++++++++++++++++++++------------------------- app/service/rest.py | 9 +++++++++ 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 404322123..278d273ad 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -5,7 +5,7 @@ import string import time import uuid from contextlib import contextmanager -from multiprocessing import Manager +from threading import Lock from time import monotonic from celery import Celery, Task, current_task @@ -32,6 +32,9 @@ from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils.clients.redis.redis_client import RedisClient from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient +job_cache = {} +job_cache_lock = Lock() + class NotifyCelery(Celery): def init_app(self, app): @@ -152,9 +155,6 @@ def create_app(application): redis_store.init_app(application) document_download_client.init_app(application) - manager = Manager() - application.config["job_cache"] = manager.dict() - register_blueprint(application) # avoid circular imports by importing this file later diff --git a/app/aws/s3.py b/app/aws/s3.py index cc376ad24..734cd9513 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -9,6 +9,7 @@ import eventlet from boto3 import Session from flask import current_app +from app import job_cache, job_cache_lock from app.clients import AWS_CLIENT_CONFIG from notifications_utils import aware_utcnow @@ -32,30 +33,25 @@ def get_service_id_from_key(key): def set_job_cache(key, value): - current_app.logger.debug(f"Setting {key} in the job_cache to {value}.") - job_cache = current_app.config["job_cache"] - job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) + # current_app.logger.debug(f"Setting {key} in the job_cache to {value}.") + + with job_cache_lock: + job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) def get_job_cache(key): - job_cache = current_app.config["job_cache"] + ret = job_cache.get(key) - if ret is None: - current_app.logger.warning(f"Could not find {key} in the job_cache.") - else: - current_app.logger.debug(f"Got {key} from job_cache with value {ret}.") return ret def len_job_cache(): - job_cache = current_app.config["job_cache"] ret = len(job_cache) current_app.logger.debug(f"Length of job_cache is {ret}") return ret def clean_cache(): - job_cache = current_app.config["job_cache"] current_time = time.time() keys_to_delete = [] for key, (_, expiry_time) in job_cache.items(): @@ -65,8 +61,9 @@ def clean_cache(): current_app.logger.debug( f"Deleting the following keys from the job_cache: {keys_to_delete}" ) - for key in keys_to_delete: - del job_cache[key] + with job_cache_lock: + for key in keys_to_delete: + del job_cache[key] def get_s3_client(): @@ -207,9 +204,8 @@ def read_s3_file(bucket_name, object_key, s3res): extract_personalisation(job), ) - except LookupError: - # perhaps our key is not formatted as we expected. If so skip it. - current_app.logger.exception("LookupError #notify-debug-admin-1200") + except Exception: + current_app.logger.exception("Exception") def get_s3_files(): @@ -308,9 +304,7 @@ def file_exists(file_location): def get_job_location(service_id, job_id): - current_app.logger.debug( - f"#notify-debug-s3-partitioning NEW JOB_LOCATION: {NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id)}" - ) + return ( current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id), @@ -326,9 +320,7 @@ def get_old_job_location(service_id, job_id): but it will take a few days where we have to support both formats. Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE. """ - current_app.logger.debug( - f"#notify-debug-s3-partitioning OLD JOB LOCATION: {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}" - ) + return ( current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], FILE_LOCATION_STRUCTURE.format(service_id, job_id), @@ -467,7 +459,6 @@ def extract_personalisation(job): def get_phone_number_from_s3(service_id, job_id, job_row_number): job = get_job_cache(job_id) if job is None: - current_app.logger.debug(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors set_job_cache(job_id, job) @@ -481,8 +472,11 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): ) return "Unavailable" - phones = extract_phones(job, service_id, job_id) - set_job_cache(f"{job_id}_phones", phones) + phones = get_job_cache(f"{job_id}_phones") + if phones is None: + current_app.logger.debug("HAVE TO REEXTRACT PHONES!") + phones = extract_phones(job, service_id, job_id) + set_job_cache(f"{job_id}_phones", phones) # If we can find the quick dictionary, use it phone_to_return = phones[job_row_number] @@ -501,7 +495,6 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): # So this is a little recycling mechanism to reduce the number of downloads. job = get_job_cache(job_id) if job is None: - current_app.logger.debug(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors set_job_cache(job_id, job) @@ -519,7 +512,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) + personalisation = get_job_cache(f"{job_id}_personalisation") + if personalisation is None: + set_job_cache(f"{job_id}_personalisation", extract_personalisation(job)) return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number) diff --git a/app/service/rest.py b/app/service/rest.py index 154ce3997..8177eba2f 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -1,4 +1,5 @@ import itertools +import time from datetime import datetime, timedelta from zoneinfo import ZoneInfo @@ -504,6 +505,8 @@ def get_all_notifications_for_service(service_id): if "page_size" in data else current_app.config.get("PAGE_SIZE") ) + # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big) + page_size = 100 limit_days = data.get("limit_days") include_jobs = data.get("include_jobs", True) include_from_test_key = data.get("include_from_test_key", False) @@ -517,6 +520,8 @@ def get_all_notifications_for_service(service_id): f"get pagination with {service_id} service_id filters {data} \ limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}" ) + start_time = time.time() + current_app.logger.debug(f"Start report generation with page.size {page_size}") pagination = notifications_dao.get_notifications_for_service( service_id, filter_dict=data, @@ -528,9 +533,13 @@ def get_all_notifications_for_service(service_id): include_from_test_key=include_from_test_key, include_one_off=include_one_off, ) + current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}") for notification in pagination.items: if notification.job_id is not None: + current_app.logger.debug( + f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}" + ) notification.personalisation = get_personalisation_from_s3( notification.service_id, notification.job_id, From 6a5ff0013655f7c09f0252232884d89e6feb605a Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 21 May 2025 14:12:42 -0700 Subject: [PATCH 2/6] change page size --- app/service/rest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/service/rest.py b/app/service/rest.py index 8177eba2f..7f173c611 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -506,7 +506,9 @@ def get_all_notifications_for_service(service_id): else current_app.config.get("PAGE_SIZE") ) # HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big) - page_size = 100 + # Tests are relying on the value in config (20), whereas the UI seems to pass 10000 + if page_size > 100: + page_size = 100 limit_days = data.get("limit_days") include_jobs = data.get("include_jobs", True) include_from_test_key = data.get("include_from_test_key", False) From 45f9790b1c8a706411588d009d0939703566a9e2 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 21 May 2025 14:47:51 -0700 Subject: [PATCH 3/6] fix tests --- tests/app/aws/test_s3.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 52ea60f4e..fa387553b 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -221,20 +221,6 @@ def test_get_s3_file_makes_correct_call(notify_api, mocker): 2, "5555555552", ), - ( - # simulate file saved with utf8withbom - "\\ufeffPHONE NUMBER\n", - "eee", - 2, - "5555555552", - ), - ( - # simulate file saved without utf8withbom - "\\PHONE NUMBER\n", - "eee", - 2, - "5555555552", - ), ], ) def test_get_phone_number_from_s3( @@ -242,6 +228,7 @@ def test_get_phone_number_from_s3( ): get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3") get_job_mock.return_value = job + phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number) assert phone_number == expected_phone_number From 170dc12284d54c87097549ed1bfeb368478e12ee Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 22 May 2025 08:31:33 -0700 Subject: [PATCH 4/6] fix reports --- app/aws/s3.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/aws/s3.py b/app/aws/s3.py index 734cd9513..8c542efe8 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -477,6 +477,11 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): current_app.logger.debug("HAVE TO REEXTRACT PHONES!") phones = extract_phones(job, service_id, job_id) set_job_cache(f"{job_id}_phones", phones) + print(f"SETTING PHONES TO {phones}") + else: + phones = phones[ + 0 + ] # we only want the phone numbers not the cache expiration time # If we can find the quick dictionary, use it phone_to_return = phones[job_row_number] From 91f3bbaf98eded513817f53616faf94498d78bd6 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl Date: Fri, 23 May 2025 06:58:09 -0700 Subject: [PATCH 5/6] Update app/aws/s3.py Co-authored-by: ccostino --- app/aws/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 8c542efe8..9007c44de 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -204,8 +204,8 @@ def read_s3_file(bucket_name, object_key, s3res): extract_personalisation(job), ) - except Exception: - current_app.logger.exception("Exception") + except Exception as e: + current_app.logger.exception(str(e)) def get_s3_files(): From 1b7c6c2b74e57f579ff8ef7a2819a8e406a92414 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl Date: Fri, 23 May 2025 06:58:21 -0700 Subject: [PATCH 6/6] Update app/aws/s3.py Co-authored-by: ccostino --- app/aws/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 9007c44de..d98529d8a 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -477,7 +477,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): current_app.logger.debug("HAVE TO REEXTRACT PHONES!") phones = extract_phones(job, service_id, job_id) set_job_cache(f"{job_id}_phones", phones) - print(f"SETTING PHONES TO {phones}") + current_app.logger.debug(f"SETTING PHONES TO {phones}") else: phones = phones[ 0