Merge pull request #1339 from GSA/faster_s3_downloads

improve report performance
This commit is contained in:
Kenneth Kehl
2024-10-01 08:59:37 -07:00
committed by GitHub
5 changed files with 85 additions and 24 deletions

View File

@@ -209,7 +209,7 @@
"filename": "tests/app/aws/test_s3.py",
"hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747",
"is_verified": false,
"line_number": 28,
"line_number": 29,
"is_secret": false
}
],
@@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-09-26T20:29:19Z"
"generated_at": "2024-09-27T16:42:53Z"
}

View File

@@ -1,6 +1,7 @@
import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager
import botocore
@@ -132,33 +133,67 @@ def cleanup_old_s3_objects():
)
def get_s3_files():
def get_job_id_from_s3_object_key(key):
object_arr = key.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
return job_id
def read_s3_file(bucket_name, object_key, s3res):
"""
This method runs during the 'regenerate job cache' task.
Note that in addition to retrieving the jobs and putting them
into the cache, this method also does some pre-processing by
putting a list of all phone numbers into the cache as well.
This means that when the report needs to be regenerated, it
can easily find the phone numbers in the cache through job_cache[<job_id>_phones]
and the personalization through job_cache[<job_id>_personalisation], which
in theory should make report generation a lot faster.
We are moving processing from the front end where the user can see it
in wait time, to this back end process.
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_cache, job_id, object)
set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object))
set_job_cache(
job_cache,
f"{job_id}_personalisation",
extract_personalisation(object),
)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")
def get_s3_files():
"""
We're using the ThreadPoolExecutor here to speed up the retrieval of S3
csv files for scaling needs.
"""
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
objects = list_s3_objects()
object_keys = list_s3_objects()
s3res = get_s3_resource()
current_app.logger.info(
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
)
for object in objects:
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
try:
object_arr = object.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object)
.get()["Body"]
.read()
.decode("utf-8")
)
if "phone number" in object.lower():
set_job_cache(job_cache, job_id, object)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")
try:
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
except Exception:
current_app.logger.exception("Connection pool issue")
current_app.logger.info(
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
@@ -363,7 +398,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
)
return "Unavailable"
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# If we look in the job_cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
if job_cache.get(f"{job_id}_phones") is None:
phones = extract_phones(job)
@@ -400,7 +435,7 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# If we look in the job_cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if job_cache.get(f"{job_id}_personalisation") is None:
set_job_cache(

View File

@@ -13,6 +13,10 @@ AWS_CLIENT_CONFIG = Config(
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
# there may come a time when increasing this helps
# with job cache management.
max_pool_connections=10,
)

View File

@@ -39,6 +39,12 @@ def init_app(app):
for logger_instance, handler in product(warning_loggers, handlers):
logger_instance.addHandler(handler)
logger_instance.setLevel(logging.WARNING)
# Suppress specific loggers to prevent leaking sensitive info
logging.getLogger("boto3").setLevel(logging.ERROR)
logging.getLogger("botocore").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
app.logger.info("Logging configured")

View File

@@ -9,6 +9,7 @@ from app.aws.s3 import (
cleanup_old_s3_objects,
file_exists,
get_job_from_s3,
get_job_id_from_s3_object_key,
get_personalisation_from_s3,
get_phone_number_from_s3,
get_s3_file,
@@ -117,6 +118,21 @@ def test_get_phone_number_from_s3(
assert phone_number == expected_phone_number
@pytest.mark.parametrize(
"key, expected_job_id",
[
("service-blahblahblah-notify/abcde.csv", "abcde"),
(
"service-x-notify/4c99f361-4ed7-49b1-bd6f-02fe0c807c53.csv",
"4c99f361-4ed7-49b1-bd6f-02fe0c807c53",
),
],
)
def test_get_job_id_from_s3_object_key(key, expected_job_id):
actual_job_id = get_job_id_from_s3_object_key(key)
assert actual_job_id == expected_job_id
def mock_s3_get_object_slowdown(*args, **kwargs):
error_response = {
"Error": {