mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-02 09:26:08 -05:00
clean up s3
This commit is contained in:
147
app/aws/s3.py
147
app/aws/s3.py
@@ -19,26 +19,52 @@ JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
|
|||||||
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
|
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
|
||||||
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
|
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
|
||||||
|
|
||||||
|
# Global variable
|
||||||
|
s3_client = None
|
||||||
|
s3_resource = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_client():
|
||||||
|
global s3_client
|
||||||
|
if s3_client is None:
|
||||||
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||||
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||||
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||||
|
session = Session(
|
||||||
|
aws_access_key_id=access_key,
|
||||||
|
aws_secret_access_key=secret_key,
|
||||||
|
region_name=region,
|
||||||
|
)
|
||||||
|
s3_client = session.client("s3")
|
||||||
|
return s3_client
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_resource():
|
||||||
|
global s3_resource
|
||||||
|
if s3_resource is None:
|
||||||
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||||
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||||
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||||
|
session = Session(
|
||||||
|
aws_access_key_id=access_key,
|
||||||
|
aws_secret_access_key=secret_key,
|
||||||
|
region_name=region,
|
||||||
|
)
|
||||||
|
s3_resource = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||||
|
return s3_resource
|
||||||
|
|
||||||
|
|
||||||
def list_s3_objects():
|
def list_s3_objects():
|
||||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
|
||||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
|
||||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
|
||||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
|
||||||
session = Session(
|
|
||||||
aws_access_key_id=access_key,
|
|
||||||
aws_secret_access_key=secret_key,
|
|
||||||
region_name=region,
|
|
||||||
)
|
|
||||||
s3 = session.client("s3")
|
|
||||||
|
|
||||||
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||||
|
s3_client = get_s3_client()
|
||||||
try:
|
try:
|
||||||
response = s3.list_objects_v2(Bucket=bucket_name)
|
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
||||||
while True:
|
while True:
|
||||||
for obj in response.get("Contents", []):
|
for obj in response.get("Contents", []):
|
||||||
yield obj["Key"]
|
yield obj["Key"]
|
||||||
if "NextContinuationToken" in response:
|
if "NextContinuationToken" in response:
|
||||||
response = s3.list_objects_v2(
|
response = s3_client.list_objects_v2(
|
||||||
Bucket=bucket_name,
|
Bucket=bucket_name,
|
||||||
ContinuationToken=response["NextContinuationToken"],
|
ContinuationToken=response["NextContinuationToken"],
|
||||||
)
|
)
|
||||||
@@ -51,19 +77,11 @@ def list_s3_objects():
|
|||||||
|
|
||||||
|
|
||||||
def get_s3_files():
|
def get_s3_files():
|
||||||
current_app.logger.info("Regenerate job cache #notify-admin-1200")
|
|
||||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
|
||||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
|
||||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
|
||||||
session = Session(
|
|
||||||
aws_access_key_id=access_key,
|
|
||||||
aws_secret_access_key=secret_key,
|
|
||||||
region_name=region,
|
|
||||||
)
|
|
||||||
objects = list_s3_objects()
|
objects = list_s3_objects()
|
||||||
|
|
||||||
s3res = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
s3res = get_s3_resource()
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
|
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
|
||||||
)
|
)
|
||||||
@@ -99,12 +117,8 @@ def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
|
|||||||
def download_from_s3(
|
def download_from_s3(
|
||||||
bucket_name, s3_key, local_filename, access_key, secret_key, region
|
bucket_name, s3_key, local_filename, access_key, secret_key, region
|
||||||
):
|
):
|
||||||
session = Session(
|
|
||||||
aws_access_key_id=access_key,
|
s3 = get_s3_client()
|
||||||
aws_secret_access_key=secret_key,
|
|
||||||
region_name=region,
|
|
||||||
)
|
|
||||||
s3 = session.client("s3", config=AWS_CLIENT_CONFIG)
|
|
||||||
result = None
|
result = None
|
||||||
try:
|
try:
|
||||||
result = s3.download_file(bucket_name, s3_key, local_filename)
|
result = s3.download_file(bucket_name, s3_key, local_filename)
|
||||||
@@ -123,27 +137,28 @@ def download_from_s3(
|
|||||||
|
|
||||||
|
|
||||||
def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
|
def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
|
||||||
session = Session(
|
|
||||||
aws_access_key_id=access_key,
|
s3 = get_s3_resource()
|
||||||
aws_secret_access_key=secret_key,
|
try:
|
||||||
region_name=region,
|
return s3.Object(bucket_name, file_location)
|
||||||
)
|
except botocore.exceptions.ClientError:
|
||||||
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
current_app.logger.error(
|
||||||
return s3.Object(bucket_name, file_location)
|
f"Can't retrieve S3 Object from {file_location}", exc_info=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def purge_bucket(bucket_name, access_key, secret_key, region):
|
def purge_bucket(bucket_name, access_key, secret_key, region):
|
||||||
session = Session(
|
s3 = get_s3_resource()
|
||||||
aws_access_key_id=access_key,
|
|
||||||
aws_secret_access_key=secret_key,
|
|
||||||
region_name=region,
|
|
||||||
)
|
|
||||||
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
|
||||||
bucket = s3.Bucket(bucket_name)
|
bucket = s3.Bucket(bucket_name)
|
||||||
bucket.objects.all().delete()
|
bucket.objects.all().delete()
|
||||||
|
|
||||||
|
|
||||||
def file_exists(bucket_name, file_location, access_key, secret_key, region):
|
def file_exists(file_location):
|
||||||
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||||
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||||
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||||
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# try and access metadata of object
|
# try and access metadata of object
|
||||||
get_s3_object(
|
get_s3_object(
|
||||||
@@ -172,36 +187,58 @@ def get_job_and_metadata_from_s3(service_id, job_id):
|
|||||||
|
|
||||||
|
|
||||||
def get_job_from_s3(service_id, job_id):
|
def get_job_from_s3(service_id, job_id):
|
||||||
|
"""
|
||||||
|
If and only if we hit a throttling exception of some kind, we want to try
|
||||||
|
exponential backoff. However, if we are getting NoSuchKey or something
|
||||||
|
that indicates things are permanently broken, we want to give up right away
|
||||||
|
to save time.
|
||||||
|
"""
|
||||||
# We have to make sure the retries don't take up to much time, because
|
# We have to make sure the retries don't take up to much time, because
|
||||||
# we might be retrieving dozens of jobs. So max time is:
|
# we might be retrieving dozens of jobs. So max time is:
|
||||||
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
|
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
|
||||||
retries = 0
|
retries = 0
|
||||||
max_retries = 4
|
max_retries = 4
|
||||||
backoff_factor = 0.2
|
backoff_factor = 0.2
|
||||||
|
|
||||||
|
if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
|
||||||
|
current_app.logger.error(
|
||||||
|
f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
while retries < max_retries:
|
while retries < max_retries:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
obj = get_s3_object(*get_job_location(service_id, job_id))
|
obj = get_s3_object(*get_job_location(service_id, job_id))
|
||||||
return obj.get()["Body"].read().decode("utf-8")
|
return obj.get()["Body"].read().decode("utf-8")
|
||||||
except botocore.exceptions.ClientError:
|
except botocore.exceptions.ClientError as e:
|
||||||
current_app.logger.error(
|
if e.response["Error"]["Code"] in [
|
||||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
"Throttling",
|
||||||
exc_info=True,
|
"RequestTimeout",
|
||||||
)
|
"SlowDown",
|
||||||
retries += 1
|
]:
|
||||||
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
current_app.logger.error(
|
||||||
time.sleep(sleep_time)
|
f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
||||||
continue
|
exc_info=True,
|
||||||
|
)
|
||||||
|
retries += 1
|
||||||
|
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# Typically this is "NoSuchKey"
|
||||||
|
current_app.logger.error(
|
||||||
|
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
current_app.logger.error(
|
current_app.logger.error(
|
||||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
retries += 1
|
return None
|
||||||
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
continue
|
|
||||||
|
|
||||||
current_app.logger.error(
|
current_app.logger.error(
|
||||||
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
||||||
|
|||||||
@@ -98,16 +98,25 @@ def mock_s3_get_object_slowdown(*args, **kwargs):
|
|||||||
raise ClientError(error_response, "GetObject")
|
raise ClientError(error_response, "GetObject")
|
||||||
|
|
||||||
|
|
||||||
def test_get_job_from_s3_exponential_backoff(mocker):
|
def test_get_job_from_s3_exponential_backoff_on_throttling(mocker):
|
||||||
# We try multiple times to retrieve the job, and if we can't we return None
|
# We try multiple times to retrieve the job, and if we can't we return None
|
||||||
mock_get_object = mocker.patch(
|
mock_get_object = mocker.patch(
|
||||||
"app.aws.s3.get_s3_object", side_effect=mock_s3_get_object_slowdown
|
"app.aws.s3.get_s3_object", side_effect=mock_s3_get_object_slowdown
|
||||||
)
|
)
|
||||||
|
mocker.patch("app.aws.s3.file_exists", return_value=True)
|
||||||
job = get_job_from_s3("service_id", "job_id")
|
job = get_job_from_s3("service_id", "job_id")
|
||||||
assert job is None
|
assert job is None
|
||||||
assert mock_get_object.call_count == 4
|
assert mock_get_object.call_count == 4
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_job_from_s3_exponential_backoff_file_not_found(mocker):
|
||||||
|
mock_get_object = mocker.patch("app.aws.s3.get_s3_object", return_value=None)
|
||||||
|
mocker.patch("app.aws.s3.file_exists", return_value=False)
|
||||||
|
job = get_job_from_s3("service_id", "job_id")
|
||||||
|
assert job is None
|
||||||
|
assert mock_get_object.call_count == 0
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"job, job_id, job_row_number, expected_personalisation",
|
"job, job_id, job_row_number, expected_personalisation",
|
||||||
[
|
[
|
||||||
@@ -180,19 +189,9 @@ def test_file_exists_true(notify_api, mocker):
|
|||||||
get_s3_mock = mocker.patch("app.aws.s3.get_s3_object")
|
get_s3_mock = mocker.patch("app.aws.s3.get_s3_object")
|
||||||
|
|
||||||
file_exists(
|
file_exists(
|
||||||
os.getenv("CSV_BUCKET_NAME"),
|
|
||||||
"mykey",
|
"mykey",
|
||||||
default_access_key,
|
|
||||||
default_secret_key,
|
|
||||||
default_region,
|
|
||||||
)
|
|
||||||
get_s3_mock.assert_called_once_with(
|
|
||||||
os.getenv("CSV_BUCKET_NAME"),
|
|
||||||
"mykey",
|
|
||||||
default_access_key,
|
|
||||||
default_secret_key,
|
|
||||||
default_region,
|
|
||||||
)
|
)
|
||||||
|
get_s3_mock.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
def test_file_exists_false(notify_api, mocker):
|
def test_file_exists_false(notify_api, mocker):
|
||||||
@@ -207,17 +206,7 @@ def test_file_exists_false(notify_api, mocker):
|
|||||||
|
|
||||||
with pytest.raises(ClientError):
|
with pytest.raises(ClientError):
|
||||||
file_exists(
|
file_exists(
|
||||||
os.getenv("CSV_BUCKET_NAME"),
|
|
||||||
"mykey",
|
"mykey",
|
||||||
default_access_key,
|
|
||||||
default_secret_key,
|
|
||||||
default_region,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
get_s3_mock.assert_called_once_with(
|
get_s3_mock.assert_called_once()
|
||||||
os.getenv("CSV_BUCKET_NAME"),
|
|
||||||
"mykey",
|
|
||||||
default_access_key,
|
|
||||||
default_secret_key,
|
|
||||||
default_region,
|
|
||||||
)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user