diff --git a/app/aws/s3.py b/app/aws/s3.py index 4f448535a..e226f88c2 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -2,7 +2,6 @@ import csv import datetime import re import time -from concurrent.futures import ThreadPoolExecutor from io import StringIO import botocore @@ -25,8 +24,15 @@ s3_client = None s3_resource = None +def get_service_id_from_key(key): + key = key.replace("service-", "") + key = key.split("/") + key = key[0].replace("-notify", "") + return key + + def set_job_cache(key, value): - current_app.logger.debug(f"Setting {key} in the job_cache.") + current_app.logger.debug(f"Setting {key} in the job_cache to {value}.") job_cache = current_app.config["job_cache"] job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) @@ -37,7 +43,7 @@ def get_job_cache(key): if ret is None: current_app.logger.warning(f"Could not find {key} in the job_cache.") else: - current_app.logger.debug(f"Got {key} from job_cache.") + current_app.logger.debug(f"Got {key} from job_cache with value {ret}.") return ret @@ -185,18 +191,20 @@ def read_s3_file(bucket_name, object_key, s3res): """ try: job_id = get_job_id_from_s3_object_key(object_key) + service_id = get_service_id_from_key(object_key) + if get_job_cache(job_id) is None: - object = ( + job = ( s3res.Object(bucket_name, object_key) .get()["Body"] .read() .decode("utf-8") ) - set_job_cache(job_id, object) - set_job_cache(f"{job_id}_phones", extract_phones(object)) + set_job_cache(job_id, job) + set_job_cache(f"{job_id}_phones", extract_phones(job, service_id, job_id)) set_job_cache( f"{job_id}_personalisation", - extract_personalisation(object), + extract_personalisation(job), ) except LookupError: @@ -217,8 +225,8 @@ def get_s3_files(): f"job_cache length before regen: {len_job_cache()} #notify-debug-admin-1200" ) try: - with ThreadPoolExecutor() as executor: - executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) + for object_key in object_keys: + read_s3_file(bucket_name, object_key, s3res) except Exception: current_app.logger.exception("Connection pool issue") diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index e58a27275..cd6b76962 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -39,7 +39,7 @@ default_region = getenv("CSV_AWS_REGION") def single_s3_object_stub(key="foo", last_modified=None): return { - "ETag": '"d41d8cd98f00b204e9800998ecf8427e"', + "ETag": '"d"', "Key": key, "LastModified": last_modified or utc_now(), } @@ -420,29 +420,17 @@ def test_get_s3_files_success(client, mocker): "CSV_UPLOAD_BUCKET": {"bucket": "test-bucket"}, "job_cache": {}, } - mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor") mock_read_s3_file = mocker.patch("app.aws.s3.read_s3_file") mock_list_s3_objects = mocker.patch("app.aws.s3.list_s3_objects") mock_get_s3_resource = mocker.patch("app.aws.s3.get_s3_resource") mock_list_s3_objects.return_value = ["file1.csv", "file2.csv"] mock_s3_resource = MagicMock() mock_get_s3_resource.return_value = mock_s3_resource - mock_executor = MagicMock() - - def mock_map(func, iterable): - for item in iterable: - func(item) - - mock_executor.map.side_effect = mock_map - mock_thread_pool_executor.return_value.__enter__.return_value = mock_executor get_s3_files() # mock_current_app.config.__getitem__.assert_called_once_with("CSV_UPLOAD_BUCKET") mock_list_s3_objects.assert_called_once() - mock_thread_pool_executor.assert_called_once() - - mock_executor.map.assert_called_once() calls = [ (("test-bucket", "file1.csv", mock_s3_resource),), @@ -615,15 +603,6 @@ def test_get_s3_files_handles_exception(mocker): mock_read_s3_file = mocker.patch( "app.aws.s3.read_s3_file", side_effect=[None, Exception("exception here")] ) - - mock_thread_pool_executor = mocker.patch("app.aws.s3.ThreadPoolExecutor") - mock_executor = mock_thread_pool_executor.return_value.__enter__.return_value - - def mock_map(func, iterable): - for item in iterable: - func(item) - - mock_executor.map.side_effect = mock_map get_s3_files() calls = [