From b9eec694239ee2947c9df55286292e5222c922b2 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 13 Aug 2024 14:51:51 -0700 Subject: [PATCH 1/4] add retry with backoff --- app/aws/s3.py | 35 +++++++++++++++++++++++++++++------ app/config.py | 14 +++++++++++++- app/service/rest.py | 10 ---------- poetry.lock | 5 +---- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 760e4dedf..a0c3aad2c 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,4 +1,5 @@ import re +import time import botocore from boto3 import Session @@ -107,15 +108,15 @@ def download_from_s3( result = None try: result = s3.download_file(bucket_name, s3_key, local_filename) - print(f"File downloaded successfully to {local_filename}") + current_app.logger.info(f"File downloaded successfully to {local_filename}") except botocore.exceptions.NoCredentialsError as nce: - print("Credentials not found") + current_app.logger.error("Credentials not found") raise Exception(nce) except botocore.exceptions.PartialCredentialsError as pce: - print("Incomplete credentials provided") + current_app.logger.error("Incomplete credentials provided") raise Exception(pce) except Exception as e: - print(f"An error occurred {e}") + current_app.logger.error(f"An error occurred {e}") text = f"EXCEPTION {e} local_filename {local_filename}" raise Exception(text) return result @@ -171,8 +172,29 @@ def get_job_and_metadata_from_s3(service_id, job_id): def get_job_from_s3(service_id, job_id): - obj = get_s3_object(*get_job_location(service_id, job_id)) - return obj.get()["Body"].read().decode("utf-8") + retries = 0 + max_retries = 5 + backoff_factor = 1 + while retries < max_retries: + + try: + obj = get_s3_object(*get_job_location(service_id, job_id)) + return obj.get()["Body"].read().decode("utf-8") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in [ + "Throttling", + "RequestTimeout", + "SlowDown", + ]: + retries += 1 + sleep_time = backoff_factor * (2**retries) # Exponential backoff + time.sleep(sleep_time) + continue + except Exception as e: + current_app.logger.error(f"Failed to get object from bucket {e}") + raise + + raise Exception(f"Failed to get object after 5 attempts") def incr_jobs_cache_misses(): @@ -241,6 +263,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): # So this is a little recycling mechanism to reduce the number of downloads. job = JOBS.get(job_id) if job is None: + current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) JOBS[job_id] = job incr_jobs_cache_misses() diff --git a/app/config.py b/app/config.py index 65ef6b2d3..a8c8fd9de 100644 --- a/app/config.py +++ b/app/config.py @@ -1,5 +1,5 @@ import json -from datetime import timedelta +from datetime import datetime, timedelta from os import getenv, path from celery.schedules import crontab @@ -165,6 +165,8 @@ class Config(object): # we only need real email in Live environment (production) DVLA_EMAIL_ADDRESSES = json.loads(getenv("DVLA_EMAIL_ADDRESSES", "[]")) + current_minute = (datetime.now().minute + 1) % 60 + CELERY = { "broker_url": REDIS_URL, "broker_transport_options": { @@ -254,6 +256,16 @@ class Config(object): "schedule": crontab(minute="*/30"), "options": {"queue": QueueNames.PERIODIC}, }, + "regenerate-job-cache-on-startup": { + "task": "regenerate-job-cache", + "schedule": crontab( + minute=current_minute + ), # Runs once at the next minute + "options": { + "queue": QueueNames.PERIODIC, + "expires": 60, + }, # Ensure it doesn't run if missed + }, "cleanup-unfinished-jobs": { "task": "cleanup-unfinished-jobs", "schedule": crontab(hour=4, minute=5), diff --git a/app/service/rest.py b/app/service/rest.py index 687cf5a23..b61ea0394 100644 --- a/app/service/rest.py +++ b/app/service/rest.py @@ -511,11 +511,6 @@ def get_all_notifications_for_service(service_id): ) except ClientError as ex: if ex.response["Error"]["Code"] == "NoSuchKey": - s = notification.service_id - j = notification.job_id - current_app.logger.warning( - f"No personalisation found for s3 file location service: service-{s}-notify/{j}.csv" - ) notification.personalisation = "" else: raise ex @@ -531,11 +526,6 @@ def get_all_notifications_for_service(service_id): notification.normalised_to = recipient except ClientError as ex: if ex.response["Error"]["Code"] == "NoSuchKey": - s = notification.service_id - j = notification.job_id - current_app.logger.warning( - f"No phone number found for s3 file location service: service-{s}-notify/{j}.csv" - ) notification.to = "" notification.normalised_to = "" else: diff --git a/poetry.lock b/poetry.lock index 7842eb263..178795e6f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2093,13 +2093,9 @@ files = [ {file = "lxml-5.2.2-cp36-cp36m-win_amd64.whl", hash = "sha256:edcfa83e03370032a489430215c1e7783128808fd3e2e0a3225deee278585196"}, {file = "lxml-5.2.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:28bf95177400066596cdbcfc933312493799382879da504633d16cf60bba735b"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3a745cc98d504d5bd2c19b10c79c61c7c3df9222629f1b6210c0368177589fb8"}, - {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1b590b39ef90c6b22ec0be925b211298e810b4856909c8ca60d27ffbca6c12e6"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b336b0416828022bfd5a2e3083e7f5ba54b96242159f83c7e3eebaec752f1716"}, - {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:c2faf60c583af0d135e853c86ac2735ce178f0e338a3c7f9ae8f622fd2eb788c"}, {file = "lxml-5.2.2-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:4bc6cb140a7a0ad1f7bc37e018d0ed690b7b6520ade518285dc3171f7a117905"}, - {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:7ff762670cada8e05b32bf1e4dc50b140790909caa8303cfddc4d702b71ea184"}, {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:57f0a0bbc9868e10ebe874e9f129d2917750adf008fe7b9c1598c0fbbfdde6a6"}, - {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_2_aarch64.whl", hash = "sha256:a6d2092797b388342c1bc932077ad232f914351932353e2e8706851c870bca1f"}, {file = "lxml-5.2.2-cp37-cp37m-musllinux_1_2_x86_64.whl", hash = "sha256:60499fe961b21264e17a471ec296dcbf4365fbea611bf9e303ab69db7159ce61"}, {file = "lxml-5.2.2-cp37-cp37m-win32.whl", hash = "sha256:d9b342c76003c6b9336a80efcc766748a333573abf9350f4094ee46b006ec18f"}, {file = "lxml-5.2.2-cp37-cp37m-win_amd64.whl", hash = "sha256:b16db2770517b8799c79aa80f4053cd6f8b716f21f8aca962725a9565ce3ee40"}, @@ -2488,6 +2484,7 @@ files = [ {file = "msgpack-1.0.8-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5fbb160554e319f7b22ecf530a80a3ff496d38e8e07ae763b9e82fadfe96f273"}, {file = "msgpack-1.0.8-cp39-cp39-win32.whl", hash = "sha256:f9af38a89b6a5c04b7d18c492c8ccf2aee7048aff1ce8437c4683bb5a1df893d"}, {file = "msgpack-1.0.8-cp39-cp39-win_amd64.whl", hash = "sha256:ed59dd52075f8fc91da6053b12e8c89e37aa043f8986efd89e61fae69dc1b011"}, + {file = "msgpack-1.0.8-py3-none-any.whl", hash = "sha256:24f727df1e20b9876fa6e95f840a2a2651e34c0ad147676356f4bf5fbb0206ca"}, {file = "msgpack-1.0.8.tar.gz", hash = "sha256:95c02b0e27e706e48d0e5426d1710ca78e0f0628d6e89d5b5a5b91a5f12274f3"}, ] From 76c02a15071d39921f2ef56769d88a3301ae5032 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 13 Aug 2024 14:55:55 -0700 Subject: [PATCH 2/4] fix flake8 --- app/aws/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index a0c3aad2c..2b7feaf15 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -194,7 +194,7 @@ def get_job_from_s3(service_id, job_id): current_app.logger.error(f"Failed to get object from bucket {e}") raise - raise Exception(f"Failed to get object after 5 attempts") + raise Exception("Failed to get object after 5 attempts") def incr_jobs_cache_misses(): From db9197e7a685e834d0ffd016141af6c619fe4294 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 13 Aug 2024 15:32:43 -0700 Subject: [PATCH 3/4] ugh secrets --- .ds.baseline | 4 ++-- tests/app/aws/test_s3.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/.ds.baseline b/.ds.baseline index 8b6703173..20143d6cd 100644 --- a/.ds.baseline +++ b/.ds.baseline @@ -209,7 +209,7 @@ "filename": "tests/app/aws/test_s3.py", "hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747", "is_verified": false, - "line_number": 24, + "line_number": 25, "is_secret": false } ], @@ -384,5 +384,5 @@ } ] }, - "generated_at": "2024-08-01T17:38:39Z" + "generated_at": "2024-08-13T22:32:28Z" } diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index c009c369c..7ff3eac01 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -6,6 +6,7 @@ from botocore.exceptions import ClientError from app.aws.s3 import ( file_exists, + get_job_from_s3, get_personalisation_from_s3, get_phone_number_from_s3, get_s3_file, @@ -86,6 +87,21 @@ def test_get_phone_number_from_s3( phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number) assert phone_number == expected_phone_number +def mock_s3_get_object_slowdown(*args, **kwargs): + error_response = { + 'Error': { + 'Code': 'SlowDown', + 'Message': 'Reduce your request rate', + } + } + raise ClientError(error_response, 'GetObject') + +def test_get_job_from_s3_exponential_backoff(mocker): + get_s3_object = mocker.patch("app.aws.s3.get_s3_object", side_effect=mock_s3_get_object_slowdown) + with pytest.raises(Exception) as exc_info: + job = get_job_from_s3("service_id", "job_id") + assert 'Failed to get object after 5 attempts' in str(exc_info) + @pytest.mark.parametrize( "job, job_id, job_row_number, expected_personalisation", From 7258105c916f434463b9be7d9de249b608d3177c Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Tue, 13 Aug 2024 15:45:39 -0700 Subject: [PATCH 4/4] fix flake 8 --- tests/app/aws/test_s3.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 7ff3eac01..d625f6b06 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -87,20 +87,22 @@ def test_get_phone_number_from_s3( phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number) assert phone_number == expected_phone_number + def mock_s3_get_object_slowdown(*args, **kwargs): error_response = { - 'Error': { - 'Code': 'SlowDown', - 'Message': 'Reduce your request rate', + "Error": { + "Code": "SlowDown", + "Message": "Reduce your request rate", } } - raise ClientError(error_response, 'GetObject') + raise ClientError(error_response, "GetObject") + def test_get_job_from_s3_exponential_backoff(mocker): - get_s3_object = mocker.patch("app.aws.s3.get_s3_object", side_effect=mock_s3_get_object_slowdown) + mocker.patch("app.aws.s3.get_s3_object", side_effect=mock_s3_get_object_slowdown) with pytest.raises(Exception) as exc_info: - job = get_job_from_s3("service_id", "job_id") - assert 'Failed to get object after 5 attempts' in str(exc_info) + get_job_from_s3("service_id", "job_id") + assert "Failed to get object after 5 attempts" in str(exc_info) @pytest.mark.parametrize(