From ec6bfd82258e785aa1c8ae2cb419a2198c4074b3 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 25 Sep 2024 12:26:01 -0700 Subject: [PATCH 1/8] improve report performance --- app/aws/s3.py | 64 +++++++++++++++++++++++++++++++++++---------------- app/config.py | 2 +- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 52e2a5eb1..3f2af6183 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,6 +1,7 @@ import datetime import re import time +from concurrent.futures import ThreadPoolExecutor import botocore from boto3 import Session @@ -115,33 +116,56 @@ def cleanup_old_s3_objects(): ) -def get_s3_files(): +def read_s3_file(bucket_name, object_key, s3res): + """ + This method runs during the 'regenerate job cache' task. + Note that in addition to retrieving the jobs and putting them + into the cache, this method also does some pre-processing by + putting a list of all phone numbers into the cache as well. + This means that when the report needs to be regenerated, it + can easily find the phone numbers in the cache through JOBS[_phones] + and the personalization through JOBS[_personalisation], which + in theory should make report generation a lot faster. + + We are moving processing from the front end where the user can see it + in wait time, to this back end process. + """ + try: + + object_arr = object_key.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + if JOBS.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object_key) + .get()["Body"] + .read() + .decode("utf-8") + ) + if "phone number" in object.lower(): + JOBS[job_id] = object + JOBS[f"{job_id}_phones"] = extract_phones(object) + JOBS[f"{job_id}_personalisation"] = extract_personalisation(object) + except LookupError: + # perhaps our key is not formatted as we expected. If so skip it. + current_app.logger.exception("LookupError #notify-admin-1200") + + +def get_s3_files(): + """ + We're using the ThreadPoolExecutor here to speed up the retrieval of S3 + csv files for scaling needs. + """ bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] - objects = list_s3_objects() + object_keys = list_s3_objects() s3res = get_s3_resource() current_app.logger.info( f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" ) - for object in objects: - # We put our csv files in the format "service-{service_id}-notify/{job_id}" - try: - object_arr = object.split("/") - job_id = object_arr[1] # get the job_id - job_id = job_id.replace(".csv", "") # we just want the job_id - if JOBS.get(job_id) is None: - object = ( - s3res.Object(bucket_name, object) - .get()["Body"] - .read() - .decode("utf-8") - ) - if "phone number" in object.lower(): - JOBS[job_id] = object - except LookupError: - # perhaps our key is not formatted as we expected. If so skip it. - current_app.logger.exception("LookupError #notify-admin-1200") + with ThreadPoolExecutor() as executor: + executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) current_app.logger.info( f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" diff --git a/app/config.py b/app/config.py index 71fa4ed23..9a4412615 100644 --- a/app/config.py +++ b/app/config.py @@ -256,7 +256,7 @@ class Config(object): }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(minute="*/30"), + "schedule": crontab(minute="*/3"), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache-on-startup": { From 291890b154013e5e52466c2218cd39b27be69b7f Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 25 Sep 2024 12:32:30 -0700 Subject: [PATCH 2/8] revert change to task schedule --- app/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/config.py b/app/config.py index 9a4412615..71fa4ed23 100644 --- a/app/config.py +++ b/app/config.py @@ -256,7 +256,7 @@ class Config(object): }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(minute="*/3"), + "schedule": crontab(minute="*/30"), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache-on-startup": { From e5ac50b694b698a1384907eb8f8a6153a76aba35 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 26 Sep 2024 07:17:12 -0700 Subject: [PATCH 3/8] add test --- .ds.baseline | 4 ++-- app/aws/s3.py | 13 +++++++++---- app/config.py | 2 +- tests/app/aws/test_s3.py | 16 ++++++++++++++++ 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/.ds.baseline b/.ds.baseline index 6ef3c9108..26b862646 100644 --- a/.ds.baseline +++ b/.ds.baseline @@ -209,7 +209,7 @@ "filename": "tests/app/aws/test_s3.py", "hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747", "is_verified": false, - "line_number": 27, + "line_number": 28, "is_secret": false } ], @@ -384,5 +384,5 @@ } ] }, - "generated_at": "2024-09-10T18:12:39Z" + "generated_at": "2024-09-26T14:17:05Z" } diff --git a/app/aws/s3.py b/app/aws/s3.py index 3f2af6183..bbb06e602 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -116,6 +116,13 @@ def cleanup_old_s3_objects(): ) +def get_job_id_from_s3_object_key(key): + object_arr = key.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + return job_id + + def read_s3_file(bucket_name, object_key, s3res): """ This method runs during the 'regenerate job cache' task. @@ -132,10 +139,7 @@ def read_s3_file(bucket_name, object_key, s3res): in wait time, to this back end process. """ try: - - object_arr = object_key.split("/") - job_id = object_arr[1] # get the job_id - job_id = job_id.replace(".csv", "") # we just want the job_id + job_id = get_job_id_from_s3_object_key(object_key) if JOBS.get(job_id) is None: object = ( s3res.Object(bucket_name, object_key) @@ -147,6 +151,7 @@ def read_s3_file(bucket_name, object_key, s3res): JOBS[job_id] = object JOBS[f"{job_id}_phones"] = extract_phones(object) JOBS[f"{job_id}_personalisation"] = extract_personalisation(object) + except LookupError: # perhaps our key is not formatted as we expected. If so skip it. current_app.logger.exception("LookupError #notify-admin-1200") diff --git a/app/config.py b/app/config.py index 71fa4ed23..9a4412615 100644 --- a/app/config.py +++ b/app/config.py @@ -256,7 +256,7 @@ class Config(object): }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(minute="*/30"), + "schedule": crontab(minute="*/3"), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache-on-startup": { diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index dcc1cbe44..17222d2f0 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -8,6 +8,7 @@ from app.aws.s3 import ( cleanup_old_s3_objects, file_exists, get_job_from_s3, + get_job_id_from_s3_object_key, get_personalisation_from_s3, get_phone_number_from_s3, get_s3_file, @@ -102,6 +103,21 @@ def test_get_phone_number_from_s3( assert phone_number == expected_phone_number +@pytest.mark.parametrize( + "key, expected_job_id", + [ + ("service-blahblahblah-notify/abcde.csv", "abcde"), + ( + "service-x-notify/4c99f361-4ed7-49b1-bd6f-02fe0c807c53.csv", + "4c99f361-4ed7-49b1-bd6f-02fe0c807c53", + ), + ], +) +def test_get_job_id_from_s3_object_key(key, expected_job_id): + actual_job_id = get_job_id_from_s3_object_key(key) + assert actual_job_id == expected_job_id + + def mock_s3_get_object_slowdown(*args, **kwargs): error_response = { "Error": { From 16b357faa8c9086212f34da1c03684ee8d2bbf25 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 27 Sep 2024 09:55:08 -0700 Subject: [PATCH 4/8] add comment --- app/aws/s3.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/app/aws/s3.py b/app/aws/s3.py index 4272b2887..657311016 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -164,6 +164,12 @@ def read_s3_file(bucket_name, object_key, s3res): .read() .decode("utf-8") ) + # TODO we probably don't need this phone number check anymore. + # Originally, there were csv uploads that were not valid + # for messaging sending. They may have been experimental, or + # they may have not been caught by validation. When we're sure + # all csv files have a phone number column, we can remove the if. + # We are essentially just making sure this object looks like a job. if "phone number" in object.lower(): set_job_cache(job_cache, job_id, object) set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) From e99f4bc6b59a43ac1059a8a542b3e69404b25f7e Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 27 Sep 2024 12:38:27 -0700 Subject: [PATCH 5/8] suppress warnings --- app/aws/s3.py | 7 +++++-- app/clients/__init__.py | 1 + notifications_utils/logging.py | 6 ++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 657311016..5e9662965 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -196,8 +196,11 @@ def get_s3_files(): current_app.logger.info( f"job_cache length before regen: {len(job_cache)} #notify-admin-1200" ) - with ThreadPoolExecutor() as executor: - executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) + try: + with ThreadPoolExecutor() as executor: + executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) + except Exception: + current_app.logger.exception("Connection pool issue") current_app.logger.info( f"job_cache length after regen: {len(job_cache)} #notify-admin-1200" diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 9c1f4af68..cfd50d88c 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -13,6 +13,7 @@ AWS_CLIENT_CONFIG = Config( "addressing_style": "virtual", }, use_fips_endpoint=True, + max_pool_connections=50, ) diff --git a/notifications_utils/logging.py b/notifications_utils/logging.py index 3ec092b8c..dc55ae653 100644 --- a/notifications_utils/logging.py +++ b/notifications_utils/logging.py @@ -39,6 +39,12 @@ def init_app(app): for logger_instance, handler in product(warning_loggers, handlers): logger_instance.addHandler(handler) logger_instance.setLevel(logging.WARNING) + + # Suppress specific loggers to prevent leaking sensitive info + logging.getLogger("boto3").setLevel(logging.ERROR) + logging.getLogger("botocore").setLevel(logging.ERROR) + logging.getLogger("urllib3").setLevel(logging.ERROR) + app.logger.info("Logging configured") From 0d82f89bb5c7b413c3b3d0dbd30b2ad0e95f87c7 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 27 Sep 2024 14:18:42 -0700 Subject: [PATCH 6/8] reduce max connections to 10 --- app/clients/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/app/clients/__init__.py b/app/clients/__init__.py index cfd50d88c..993054279 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -13,7 +13,10 @@ AWS_CLIENT_CONFIG = Config( "addressing_style": "virtual", }, use_fips_endpoint=True, - max_pool_connections=50, + # This is the default but just for doc sake + # there may come a time when increasing this helps + # with job cache management + max_pool_connections=10, ) From f844da8c68c98f5dbf271c2e35fce885d45aafd2 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 30 Sep 2024 08:38:49 -0700 Subject: [PATCH 7/8] update comment --- app/clients/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 993054279..19b719c1c 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -15,7 +15,7 @@ AWS_CLIENT_CONFIG = Config( use_fips_endpoint=True, # This is the default but just for doc sake # there may come a time when increasing this helps - # with job cache management + # with job cache management. max_pool_connections=10, ) From 544e7e61e49be84724869c7011a5cea3ad7824c9 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 30 Sep 2024 09:08:18 -0700 Subject: [PATCH 8/8] code review feedback --- app/aws/s3.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 5e9662965..256800bf9 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -164,20 +164,13 @@ def read_s3_file(bucket_name, object_key, s3res): .read() .decode("utf-8") ) - # TODO we probably don't need this phone number check anymore. - # Originally, there were csv uploads that were not valid - # for messaging sending. They may have been experimental, or - # they may have not been caught by validation. When we're sure - # all csv files have a phone number column, we can remove the if. - # We are essentially just making sure this object looks like a job. - if "phone number" in object.lower(): - set_job_cache(job_cache, job_id, object) - set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) - set_job_cache( - job_cache, - f"{job_id}_personalisation", - extract_personalisation(object), - ) + set_job_cache(job_cache, job_id, object) + set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) + set_job_cache( + job_cache, + f"{job_id}_personalisation", + extract_personalisation(object), + ) except LookupError: # perhaps our key is not formatted as we expected. If so skip it.