Merge pull request #1710 from GSA/blocking

fix regenerate_job_cache
This commit is contained in:
ccostino
2025-05-14 15:30:45 -04:00
committed by GitHub
2 changed files with 18 additions and 31 deletions

View File

@@ -2,7 +2,6 @@ import csv
import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from io import StringIO
import botocore
@@ -25,8 +24,15 @@ s3_client = None
s3_resource = None
def get_service_id_from_key(key):
key = key.replace("service-", "")
key = key.split("/")
key = key[0].replace("-notify", "")
return key
def set_job_cache(key, value):
current_app.logger.debug(f"Setting {key} in the job_cache.")
current_app.logger.debug(f"Setting {key} in the job_cache to {value}.")
job_cache = current_app.config["job_cache"]
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
@@ -37,7 +43,7 @@ def get_job_cache(key):
if ret is None:
current_app.logger.warning(f"Could not find {key} in the job_cache.")
else:
current_app.logger.debug(f"Got {key} from job_cache.")
current_app.logger.debug(f"Got {key} from job_cache with value {ret}.")
return ret
@@ -185,18 +191,20 @@ def read_s3_file(bucket_name, object_key, s3res):
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
service_id = get_service_id_from_key(object_key)
if get_job_cache(job_id) is None:
object = (
job = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_id, object)
set_job_cache(f"{job_id}_phones", extract_phones(object))
set_job_cache(job_id, job)
set_job_cache(f"{job_id}_phones", extract_phones(job, service_id, job_id))
set_job_cache(
f"{job_id}_personalisation",
extract_personalisation(object),
extract_personalisation(job),
)
except LookupError:
@@ -217,8 +225,8 @@ def get_s3_files():
f"job_cache length before regen: {len_job_cache()} #notify-debug-admin-1200"
)
try:
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
for object_key in object_keys:
read_s3_file(bucket_name, object_key, s3res)
except Exception:
current_app.logger.exception("Connection pool issue")

View File

@@ -39,7 +39,7 @@ default_region = getenv("CSV_AWS_REGION")
def single_s3_object_stub(key="foo", last_modified=None):
return {
"ETag": '"d41d8cd98f00b204e9800998ecf8427e"',
"ETag": '"d"',
"Key": key,
"LastModified": last_modified or utc_now(),
}
@@ -420,29 +420,17 @@ def test_get_s3_files_success(client, mocker):
"CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"},
"job_cache": {},
}
mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor")
mock_read_s3_file = mocker.patch("app.aws.s3.read_s3_file")
mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects")
mock_get_s3_resource = mocker.patch("app.aws.s3.get_s3_resource")
mock_list_s3_objects.return_value = ["file1.csv", "file2.csv"]
mock_s3_resource = MagicMock()
mock_get_s3_resource.return_value = mock_s3_resource
mock_executor = MagicMock()
def mock_map(func, iterable):
for item in iterable:
func(item)
mock_executor.map.side_effect = mock_map
mock_thread_pool_executor.return_value.__enter__.return_value = mock_executor
get_s3_files()
# mock_current_app.config.__getitem__.assert_called_once_with("CSV_UPLOAD_BUCKET")
mock_list_s3_objects.assert_called_once()
mock_thread_pool_executor.assert_called_once()
mock_executor.map.assert_called_once()
calls = [
(("test-bucket", "file1.csv", mock_s3_resource),),
@@ -615,15 +603,6 @@ def test_get_s3_files_handles_exception(mocker):
mock_read_s3_file = mocker.patch(
"app.aws.s3.read_s3_file", side_effect=[None, Exception("exception here")]
)
mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor")
mock_executor = mock_thread_pool_executor.return_value.__enter__.return_value
def mock_map(func, iterable):
for item in iterable:
func(item)
mock_executor.map.side_effect = mock_map
get_s3_files()
calls = [