From 875d378b3d5a4b748f761443ab1554296a081b5b Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 19 Jul 2024 13:58:23 -0700 Subject: [PATCH 1/7] fix ExpiringDict caching solution --- app/__init__.py | 1 + app/aws/s3.py | 60 +++++++++++++++++++++++++++++++++++++++++++++ app/celery/tasks.py | 6 +++++ app/config.py | 5 ++++ poetry.lock | 6 +---- 5 files changed, 73 insertions(+), 5 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index c08c4ae0a..5d10966e8 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -252,6 +252,7 @@ def register_blueprint(application): def init_app(app): + @app.before_request def record_request_details(): g.start = monotonic() diff --git a/app/aws/s3.py b/app/aws/s3.py index 9466e6cce..749e454dd 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -19,6 +19,65 @@ JOBS_CACHE_HITS = "JOBS_CACHE_HITS" JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES" +def list_s3_objects(): + print("ENTER LIST_S3_OBJECTS") + bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] + secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] + region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + session = Session( + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + ) + s3 = session.client("s3") + + objects = [] + try: + response = s3.list_objects_v2(Bucket=bucket_name) + while True: + for obj in response.get("Contents", []): + objects.append(obj["Key"]) + if "NextContinuationToken" in response: + response = s3.list_objects_v2( + Bucket=bucket_name, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + except Exception as e: + current_app.logger.error(f"An error occurred while regenerating cache {e}") + return objects + + +def get_s3_files(): + print("ENTER GET_S3_FILES") + bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] + secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] + region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + session = Session( + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + ) + objects = list_s3_objects() + + s3res = session.resource("s3", config=AWS_CLIENT_CONFIG) + print(f"LEN JOBS BEFORE LOAD {len(JOBS)}") + for object in objects: + object_arr = object.split("/") + job_id = object_arr[1] + job_id = job_id.replace(".csv", "") + if JOBS.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object).get()["Body"].read().decode("utf-8") + ) + if "phone number" in object.lower(): + JOBS[job_id] = object + print(f"NOW LEN JOBS IS {len(JOBS)} object {object}") + + def get_s3_file(bucket_name, file_location, access_key, secret_key, region): s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region) return s3_file.get()["Body"].read().decode("utf-8") @@ -146,6 +205,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): if job is None: job = get_job_from_s3(service_id, job_id) JOBS[job_id] = job + print(f"CACHE MISS FOR JOB_ID {job_id}") incr_jobs_cache_misses() else: incr_jobs_cache_hits() diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f0d036549..5498fda7b 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -441,6 +441,12 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): ) +@notify_celery.task(name="regenerate-job-cache") +def regenerate_job_cache(): + print("ENTER REGENERATE_JOB_CACHE") + s3.get_s3_files() + + @notify_celery.task(name="process-incomplete-jobs") def process_incomplete_jobs(job_ids): jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] diff --git a/app/config.py b/app/config.py index 8d913bdd8..e1b6d5597 100644 --- a/app/config.py +++ b/app/config.py @@ -249,6 +249,11 @@ class Config(object): "schedule": crontab(hour=6, minute=0), "options": {"queue": QueueNames.PERIODIC}, }, + "regenerate-job-cache": { + "task": "regenerate-job-cache", + "schedule": crontab(), + "options": {"queue": QueueNames.PERIODIC}, + }, "cleanup-unfinished-jobs": { "task": "cleanup-unfinished-jobs", "schedule": crontab(hour=4, minute=5), diff --git a/poetry.lock b/poetry.lock index c6de7033d..9b23a4db8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2098,13 +2098,9 @@ files = [ {file = "lxml-5.2.2-cp36-cp36m-win_amd64.whl", hash = "sha256:edcfa83e03370032a489430215c1e7783128808fd3e2e0a3225deee278585196"}, {file = "lxml-5.2.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:28bf95177400066596cdbcfc933312493799382879da504633d16cf60bba735b"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a745cc98d504d5bd2c19b10c79c61c7c3df9222629f1b6210c0368177589fb8"}, - {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1b590b39ef90c6b22ec0be925b211298e810b4856909c8ca60d27ffbca6c12e6"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b336b0416828022bfd5a2e3083e7f5ba54b96242159f83c7e3eebaec752f1716"}, - {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:c2faf60c583af0d135e853c86ac2735ce178f0e338a3c7f9ae8f622fd2eb788c"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:4bc6cb140a7a0ad1f7bc37e018d0ed690b7b6520ade518285dc3171f7a117905"}, - {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:7ff762670cada8e05b32bf1e4dc50b140790909caa8303cfddc4d702b71ea184"}, {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:57f0a0bbc9868e10ebe874e9f129d2917750adf008fe7b9c1598c0fbbfdde6a6"}, - {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:a6d2092797b388342c1bc932077ad232f914351932353e2e8706851c870bca1f"}, {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:60499fe961b21264e17a471ec296dcbf4365fbea611bf9e303ab69db7159ce61"}, {file = "lxml-5.2.2-cp37-cp37m-win32.whl", hash = "sha256:d9b342c76003c6b9336a80efcc766748a333573abf9350f4094ee46b006ec18f"}, {file = "lxml-5.2.2-cp37-cp37m-win_amd64.whl", hash = "sha256:b16db2770517b8799c79aa80f4053cd6f8b716f21f8aca962725a9565ce3ee40"}, @@ -2493,6 +2489,7 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, + {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] @@ -3475,7 +3472,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, From 4d24b8257e5ae8438468ceb400c89075ac38236d Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 19 Jul 2024 15:02:54 -0700 Subject: [PATCH 2/7] clean up --- app/aws/s3.py | 7 +++---- app/config.py | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 749e454dd..20e88ed4d 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -20,7 +20,6 @@ JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES" def list_s3_objects(): - print("ENTER LIST_S3_OBJECTS") bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] @@ -51,7 +50,7 @@ def list_s3_objects(): def get_s3_files(): - print("ENTER GET_S3_FILES") + current_app.logger.info("Regenerate job cache #notify-admin-1200") bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] @@ -64,7 +63,7 @@ def get_s3_files(): objects = list_s3_objects() s3res = session.resource("s3", config=AWS_CLIENT_CONFIG) - print(f"LEN JOBS BEFORE LOAD {len(JOBS)}") + current_app.logger.info(f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200") for object in objects: object_arr = object.split("/") job_id = object_arr[1] @@ -75,7 +74,7 @@ def get_s3_files(): ) if "phone number" in object.lower(): JOBS[job_id] = object - print(f"NOW LEN JOBS IS {len(JOBS)} object {object}") + current_app.logger.info(f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200") def get_s3_file(bucket_name, file_location, access_key, secret_key, region): diff --git a/app/config.py b/app/config.py index e1b6d5597..65ef6b2d3 100644 --- a/app/config.py +++ b/app/config.py @@ -251,7 +251,7 @@ class Config(object): }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(), + "schedule": crontab(minute="*/30"), "options": {"queue": QueueNames.PERIODIC}, }, "cleanup-unfinished-jobs": { From aa630039375a156432760d3d561c3f7d1704d6b1 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 22 Jul 2024 07:21:03 -0700 Subject: [PATCH 3/7] cleanup --- app/aws/s3.py | 12 +++++++++--- app/celery/tasks.py | 1 - 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 20e88ed4d..f2de75d73 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -45,7 +45,9 @@ def list_s3_objects(): else: break except Exception as e: - current_app.logger.error(f"An error occurred while regenerating cache {e}") + current_app.logger.error( + f"An error occurred while regenerating cache #notify-admin-1200 {e}" + ) return objects @@ -63,7 +65,9 @@ def get_s3_files(): objects = list_s3_objects() s3res = session.resource("s3", config=AWS_CLIENT_CONFIG) - current_app.logger.info(f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200") + current_app.logger.info( + f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" + ) for object in objects: object_arr = object.split("/") job_id = object_arr[1] @@ -74,7 +78,9 @@ def get_s3_files(): ) if "phone number" in object.lower(): JOBS[job_id] = object - current_app.logger.info(f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200") + current_app.logger.info( + f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" + ) def get_s3_file(bucket_name, file_location, access_key, secret_key, region): diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 5498fda7b..e6ed717e7 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -443,7 +443,6 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): @notify_celery.task(name="regenerate-job-cache") def regenerate_job_cache(): - print("ENTER REGENERATE_JOB_CACHE") s3.get_s3_files() From f61a47acef072945bdf9a1d71ca02be9257da025 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 22 Jul 2024 09:11:21 -0700 Subject: [PATCH 4/7] code review feedback --- app/aws/s3.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index f2de75d73..bfc6cd64c 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -69,15 +69,24 @@ def get_s3_files(): f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" ) for object in objects: - object_arr = object.split("/") - job_id = object_arr[1] - job_id = job_id.replace(".csv", "") - if JOBS.get(job_id) is None: - object = ( - s3res.Object(bucket_name, object).get()["Body"].read().decode("utf-8") - ) - if "phone number" in object.lower(): - JOBS[job_id] = object + # We put our csv files in the format "service-{service_id}-notify/{job_id}" + try: + object_arr = object.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + if JOBS.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object) + .get()["Body"] + .read() + .decode("utf-8") + ) + if "phone number" in object.lower(): + JOBS[job_id] = object + except LookupError as le: + # perhaps our key is not formatted as we expected. If so skip it. + current_app.logger.error(f"LookupError {le} #notify-admin-1200") + current_app.logger.info( f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" ) From e97b567e9eee9b219a5c9f6fc59199ac9e49d254 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 22 Jul 2024 10:05:21 -0700 Subject: [PATCH 5/7] code review feedback --- app/aws/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index bfc6cd64c..0720c59ed 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -36,7 +36,7 @@ def list_s3_objects(): response = s3.list_objects_v2(Bucket=bucket_name) while True: for obj in response.get("Contents", []): - objects.append(obj["Key"]) + yield obj["Key"] if "NextContinuationToken" in response: response = s3.list_objects_v2( Bucket=bucket_name, @@ -48,7 +48,7 @@ def list_s3_objects(): current_app.logger.error( f"An error occurred while regenerating cache #notify-admin-1200 {e}" ) - return objects + def get_s3_files(): From 946b1e9d65650358d10fa0f77b71e62d55c589f3 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 22 Jul 2024 10:26:25 -0700 Subject: [PATCH 6/7] fix flake8 --- app/aws/s3.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 0720c59ed..6030d0ee2 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -31,7 +31,6 @@ def list_s3_objects(): ) s3 = session.client("s3") - objects = [] try: response = s3.list_objects_v2(Bucket=bucket_name) while True: @@ -50,7 +49,6 @@ def list_s3_objects(): ) - def get_s3_files(): current_app.logger.info("Regenerate job cache #notify-admin-1200") bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] From f1e09908a63fa40e9e07124a5306a37e3f5c6d8a Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Mon, 22 Jul 2024 10:59:05 -0700 Subject: [PATCH 7/7] remove print statement --- app/aws/s3.py | 1 - 1 file changed, 1 deletion(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 6030d0ee2..ebdffddd5 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -217,7 +217,6 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): if job is None: job = get_job_from_s3(service_id, job_id) JOBS[job_id] = job - print(f"CACHE MISS FOR JOB_ID {job_id}") incr_jobs_cache_misses() else: incr_jobs_cache_hits()