From 26ffb931c99cce0fb9eb0aa01cf955510db69525 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Fri, 27 Sep 2024 08:38:11 -0700 Subject: [PATCH] optimize S3 partitioning --- app/aws/s3.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/app/aws/s3.py b/app/aws/s3.py index 52e2a5eb1..f358a13f3 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -12,6 +12,7 @@ from app.clients import AWS_CLIENT_CONFIG from notifications_utils import aware_utcnow FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" +NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv" # Temporarily extend cache to 7 days ttl = 60 * 60 * 24 * 7 @@ -211,6 +212,21 @@ def file_exists(file_location): def get_job_location(service_id, job_id): + return ( + current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], + NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id), + current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"], + current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"], + current_app.config["CSV_UPLOAD_BUCKET"]["region"], + ) + + +def get_old_job_location(service_id, job_id): + """ + This is deprecated. We are transitioning to NEW_FILE_LOCATION_STRUCTURE, + but it will take a few days where we have to support both formats. + Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE. + """ return ( current_app.config["CSV_UPLOAD_BUCKET"]["bucket"], FILE_LOCATION_STRUCTURE.format(service_id, job_id), @@ -239,9 +255,11 @@ def get_job_from_s3(service_id, job_id): max_retries = 4 backoff_factor = 0.2 - if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)): + if not file_exists( + FILE_LOCATION_STRUCTURE.format(service_id, job_id) + ) and not file_exists(NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id)): current_app.logger.error( - f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}" + f"This file with service_id {service_id} and job_id {job_id} does not exist" ) return None @@ -249,6 +267,9 @@ def get_job_from_s3(service_id, job_id): try: obj = get_s3_object(*get_job_location(service_id, job_id)) + # TODO remove this when we have fully transitioned to the NEW_FILE_LOCATION_STRUCTURE + if obj is None: + obj = get_s3_object(*get_old_job_location(service_id, job_id)) return obj.get()["Body"].read().decode("utf-8") except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] in [ @@ -257,7 +278,7 @@ def get_job_from_s3(service_id, job_id): "SlowDown", ]: current_app.logger.exception( - f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}", + f"Retrying job fetch retry_count={retries}", ) retries += 1 sleep_time = backoff_factor * (2**retries) # Exponential backoff @@ -266,18 +287,18 @@ def get_job_from_s3(service_id, job_id): else: # Typically this is "NoSuchKey" current_app.logger.exception( - f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}", + f"Failed to get job with service_id {service_id} job_id {job_id}", ) return None except Exception: current_app.logger.exception( - f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}", + f"Failed to get job with service_id {service_id} job_id {job_id}retry_count={retries}", ) return None current_app.logger.error( - f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}", + f"Never retrieved job with service_id {service_id} job_id {job_id}", ) return None @@ -358,7 +379,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): if job is None: current_app.logger.error( - f"Couldnt find phone for job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} because job is missing" + f"Couldnt find phone for job with service_id {service_id} job_id {job_id} because job is missing" ) return "Unavailable"