change page size

This commit is contained in:
Kenneth Kehl
2025-05-21 13:44:14 -07:00
parent 860d4c73e3
commit 2788c21420
3 changed files with 34 additions and 30 deletions

View File

@@ -5,7 +5,7 @@ import string
import time
import uuid
from contextlib import contextmanager
from multiprocessing import Manager
from threading import Lock
from time import monotonic
from celery import Celery, Task, current_task
@@ -32,6 +32,9 @@ from notifications_utils.clients.encryption.encryption_client import Encryption
from notifications_utils.clients.redis.redis_client import RedisClient
from notifications_utils.clients.zendesk.zendesk_client import ZendeskClient
job_cache = {}
job_cache_lock = Lock()
class NotifyCelery(Celery):
def init_app(self, app):
@@ -152,9 +155,6 @@ def create_app(application):
redis_store.init_app(application)
document_download_client.init_app(application)
manager = Manager()
application.config["job_cache"] = manager.dict()
register_blueprint(application)
# avoid circular imports by importing this file later

View File

@@ -9,6 +9,7 @@ import eventlet
from boto3 import Session
from flask import current_app
from app import job_cache, job_cache_lock
from app.clients import AWS_CLIENT_CONFIG
from notifications_utils import aware_utcnow
@@ -32,30 +33,25 @@ def get_service_id_from_key(key):
def set_job_cache(key, value):
current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
job_cache = current_app.config["job_cache"]
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
# current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
with job_cache_lock:
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
def get_job_cache(key):
job_cache = current_app.config["job_cache"]
ret = job_cache.get(key)
if ret is None:
current_app.logger.warning(f"Could not find {key} in the job_cache.")
else:
current_app.logger.debug(f"Got {key} from job_cache with value {ret}.")
return ret
def len_job_cache():
job_cache = current_app.config["job_cache"]
ret = len(job_cache)
current_app.logger.debug(f"Length of job_cache is {ret}")
return ret
def clean_cache():
job_cache = current_app.config["job_cache"]
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
@@ -65,8 +61,9 @@ def clean_cache():
current_app.logger.debug(
f"Deleting the following keys from the job_cache: {keys_to_delete}"
)
for key in keys_to_delete:
del job_cache[key]
with job_cache_lock:
for key in keys_to_delete:
del job_cache[key]
def get_s3_client():
@@ -207,9 +204,8 @@ def read_s3_file(bucket_name, object_key, s3res):
extract_personalisation(job),
)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-debug-admin-1200")
except Exception:
current_app.logger.exception("Exception")
def get_s3_files():
@@ -308,9 +304,7 @@ def file_exists(file_location):
def get_job_location(service_id, job_id):
current_app.logger.debug(
f"#notify-debug-s3-partitioning NEW JOB_LOCATION: {NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
)
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id),
@@ -326,9 +320,7 @@ def get_old_job_location(service_id, job_id):
but it will take a few days where we have to support both formats.
Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE.
"""
current_app.logger.debug(
f"#notify-debug-s3-partitioning OLD JOB LOCATION: {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
)
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
FILE_LOCATION_STRUCTURE.format(service_id, job_id),
@@ -467,7 +459,6 @@ def extract_personalisation(job):
def get_phone_number_from_s3(service_id, job_id, job_row_number):
job = get_job_cache(job_id)
if job is None:
current_app.logger.debug(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
set_job_cache(job_id, job)
@@ -481,8 +472,11 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
)
return "Unavailable"
phones = extract_phones(job, service_id, job_id)
set_job_cache(f"{job_id}_phones", phones)
phones = get_job_cache(f"{job_id}_phones")
if phones is None:
current_app.logger.debug("HAVE TO REEXTRACT PHONES!")
phones = extract_phones(job, service_id, job_id)
set_job_cache(f"{job_id}_phones", phones)
# If we can find the quick dictionary, use it
phone_to_return = phones[job_row_number]
@@ -501,7 +495,6 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# So this is a little recycling mechanism to reduce the number of downloads.
job = get_job_cache(job_id)
if job is None:
current_app.logger.debug(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
set_job_cache(job_id, job)
@@ -519,7 +512,9 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}
set_job_cache(f"{job_id}_personalisation", extract_personalisation(job))
personalisation = get_job_cache(f"{job_id}_personalisation")
if personalisation is None:
set_job_cache(f"{job_id}_personalisation", extract_personalisation(job))
return get_job_cache(f"{job_id}_personalisation")[0].get(job_row_number)

View File

@@ -1,4 +1,5 @@
import itertools
import time
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
@@ -504,6 +505,8 @@ def get_all_notifications_for_service(service_id):
if "page_size" in data
else current_app.config.get("PAGE_SIZE")
)
# HARD CODE TO 100 for now. 1000 or 10000 causes reports to time out before they complete (if big)
page_size = 100
limit_days = data.get("limit_days")
include_jobs = data.get("include_jobs", True)
include_from_test_key = data.get("include_from_test_key", False)
@@ -517,6 +520,8 @@ def get_all_notifications_for_service(service_id):
f"get pagination with {service_id} service_id filters {data} \
limit_days {limit_days} include_jobs {include_jobs} include_one_off {include_one_off}"
)
start_time = time.time()
current_app.logger.debug(f"Start report generation with page.size {page_size}")
pagination = notifications_dao.get_notifications_for_service(
service_id,
filter_dict=data,
@@ -528,9 +533,13 @@ def get_all_notifications_for_service(service_id):
include_from_test_key=include_from_test_key,
include_one_off=include_one_off,
)
current_app.logger.debug(f"Query complete at {int(time.time()-start_time)*1000}")
for notification in pagination.items:
if notification.job_id is not None:
current_app.logger.debug(
f"Processing job_id {notification.job_id} at {int(time.time()-start_time)*1000}"
)
notification.personalisation = get_personalisation_from_s3(
notification.service_id,
notification.job_id,