Merge branch 'main' into update-marshmallow-deps

This commit is contained in:
Carlo Costino
2025-05-16 09:33:27 -04:00
4 changed files with 222 additions and 232 deletions

View File

@@ -2,7 +2,6 @@ import csv
import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from io import StringIO
import botocore
@@ -25,8 +24,15 @@ s3_client = None
s3_resource = None
def get_service_id_from_key(key):
key = key.replace("service-", "")
key = key.split("/")
key = key[0].replace("-notify", "")
return key
def set_job_cache(key, value):
current_app.logger.debug(f"Setting {key} in the job_cache.")
current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
job_cache = current_app.config["job_cache"]
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
@@ -37,7 +43,7 @@ def get_job_cache(key):
if ret is None:
current_app.logger.warning(f"Could not find {key} in the job_cache.")
else:
current_app.logger.debug(f"Got {key} from job_cache.")
current_app.logger.debug(f"Got {key} from job_cache with value {ret}.")
return ret
@@ -185,18 +191,20 @@ def read_s3_file(bucket_name, object_key, s3res):
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
service_id = get_service_id_from_key(object_key)
if get_job_cache(job_id) is None:
object = (
job = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_id, object)
set_job_cache(f"{job_id}_phones", extract_phones(object))
set_job_cache(job_id, job)
set_job_cache(f"{job_id}_phones", extract_phones(job, service_id, job_id))
set_job_cache(
f"{job_id}_personalisation",
extract_personalisation(object),
extract_personalisation(job),
)
except LookupError:
@@ -217,8 +225,8 @@ def get_s3_files():
f"job_cache length before regen: {len_job_cache()} #notify-debug-admin-1200"
)
try:
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
for object_key in object_keys:
read_s3_file(bucket_name, object_key, s3res)
except Exception:
current_app.logger.exception("Connection pool issue")