mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 01:41:05 -05:00
Merge pull request #1277 from GSA/more_backoff
improve exponential backoff while job hunting
This commit is contained in:
147
app/aws/s3.py
147
app/aws/s3.py
@@ -19,26 +19,52 @@ JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
|
||||
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
|
||||
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
|
||||
|
||||
# Global variable
|
||||
s3_client = None
|
||||
s3_resource = None
|
||||
|
||||
|
||||
def get_s3_client():
|
||||
global s3_client
|
||||
if s3_client is None:
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3_client = session.client("s3")
|
||||
return s3_client
|
||||
|
||||
|
||||
def get_s3_resource():
|
||||
global s3_resource
|
||||
if s3_resource is None:
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3_resource = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||
return s3_resource
|
||||
|
||||
|
||||
def list_s3_objects():
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3 = session.client("s3")
|
||||
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
s3_client = get_s3_client()
|
||||
try:
|
||||
response = s3.list_objects_v2(Bucket=bucket_name)
|
||||
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
||||
while True:
|
||||
for obj in response.get("Contents", []):
|
||||
yield obj["Key"]
|
||||
if "NextContinuationToken" in response:
|
||||
response = s3.list_objects_v2(
|
||||
response = s3_client.list_objects_v2(
|
||||
Bucket=bucket_name,
|
||||
ContinuationToken=response["NextContinuationToken"],
|
||||
)
|
||||
@@ -51,19 +77,11 @@ def list_s3_objects():
|
||||
|
||||
|
||||
def get_s3_files():
|
||||
current_app.logger.info("Regenerate job cache #notify-admin-1200")
|
||||
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
objects = list_s3_objects()
|
||||
|
||||
s3res = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||
s3res = get_s3_resource()
|
||||
current_app.logger.info(
|
||||
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
|
||||
)
|
||||
@@ -99,12 +117,8 @@ def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
|
||||
def download_from_s3(
|
||||
bucket_name, s3_key, local_filename, access_key, secret_key, region
|
||||
):
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3 = session.client("s3", config=AWS_CLIENT_CONFIG)
|
||||
|
||||
s3 = get_s3_client()
|
||||
result = None
|
||||
try:
|
||||
result = s3.download_file(bucket_name, s3_key, local_filename)
|
||||
@@ -123,27 +137,28 @@ def download_from_s3(
|
||||
|
||||
|
||||
def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||
return s3.Object(bucket_name, file_location)
|
||||
|
||||
s3 = get_s3_resource()
|
||||
try:
|
||||
return s3.Object(bucket_name, file_location)
|
||||
except botocore.exceptions.ClientError:
|
||||
current_app.logger.error(
|
||||
f"Can't retrieve S3 Object from {file_location}", exc_info=True
|
||||
)
|
||||
|
||||
|
||||
def purge_bucket(bucket_name, access_key, secret_key, region):
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||
s3 = get_s3_resource()
|
||||
bucket = s3.Bucket(bucket_name)
|
||||
bucket.objects.all().delete()
|
||||
|
||||
|
||||
def file_exists(bucket_name, file_location, access_key, secret_key, region):
|
||||
def file_exists(file_location):
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
|
||||
try:
|
||||
# try and access metadata of object
|
||||
get_s3_object(
|
||||
@@ -172,9 +187,25 @@ def get_job_and_metadata_from_s3(service_id, job_id):
|
||||
|
||||
|
||||
def get_job_from_s3(service_id, job_id):
|
||||
"""
|
||||
If and only if we hit a throttling exception of some kind, we want to try
|
||||
exponential backoff. However, if we are getting NoSuchKey or something
|
||||
that indicates things are permanently broken, we want to give up right away
|
||||
to save time.
|
||||
"""
|
||||
# We have to make sure the retries don't take up to much time, because
|
||||
# we might be retrieving dozens of jobs. So max time is:
|
||||
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
|
||||
retries = 0
|
||||
max_retries = 3
|
||||
backoff_factor = 1
|
||||
max_retries = 4
|
||||
backoff_factor = 0.2
|
||||
|
||||
if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
|
||||
current_app.logger.error(
|
||||
f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
|
||||
)
|
||||
return None
|
||||
|
||||
while retries < max_retries:
|
||||
|
||||
try:
|
||||
@@ -186,24 +217,34 @@ def get_job_from_s3(service_id, job_id):
|
||||
"RequestTimeout",
|
||||
"SlowDown",
|
||||
]:
|
||||
current_app.logger.error(
|
||||
f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
||||
exc_info=True,
|
||||
)
|
||||
retries += 1
|
||||
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
else:
|
||||
# Typically this is "NoSuchKey"
|
||||
current_app.logger.error(
|
||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} from bucket",
|
||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception:
|
||||
current_app.logger.error(
|
||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} from bucket",
|
||||
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
raise Exception("Failed to get object after 3 attempts")
|
||||
current_app.logger.error(
|
||||
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def incr_jobs_cache_misses():
|
||||
@@ -274,19 +315,15 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
|
||||
if job is None:
|
||||
current_app.logger.info(f"job {job_id} was not in the cache")
|
||||
job = get_job_from_s3(service_id, job_id)
|
||||
# Even if it is None, put it here to avoid KeyErrors
|
||||
JOBS[job_id] = job
|
||||
incr_jobs_cache_misses()
|
||||
else:
|
||||
incr_jobs_cache_hits()
|
||||
|
||||
# If the job is None after our attempt to retrieve it from s3, it
|
||||
# probably means the job is old and has been deleted from s3, in
|
||||
# which case there is nothing we can do. It's unlikely to run into
|
||||
# this, but it could theoretically happen, especially if we ever
|
||||
# change the task schedules
|
||||
if job is None:
|
||||
current_app.logger.warning(
|
||||
f"Couldnt find phone for job_id {job_id} row number {job_row_number} because job is missing"
|
||||
current_app.logger.error(
|
||||
f"Couldnt find phone for job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} because job is missing"
|
||||
)
|
||||
return "Unavailable"
|
||||
|
||||
@@ -331,7 +368,7 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
|
||||
# change the task schedules
|
||||
if job is None:
|
||||
current_app.logger.warning(
|
||||
"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
|
||||
f"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
|
||||
)
|
||||
return {}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import itertools
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from flask import Blueprint, current_app, jsonify, request
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
@@ -503,37 +502,24 @@ def get_all_notifications_for_service(service_id):
|
||||
|
||||
for notification in pagination.items:
|
||||
if notification.job_id is not None:
|
||||
try:
|
||||
notification.personalisation = get_personalisation_from_s3(
|
||||
notification.service_id,
|
||||
notification.job_id,
|
||||
notification.job_row_number,
|
||||
)
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
notification.personalisation = ""
|
||||
else:
|
||||
raise ex
|
||||
notification.personalisation = get_personalisation_from_s3(
|
||||
notification.service_id,
|
||||
notification.job_id,
|
||||
notification.job_row_number,
|
||||
)
|
||||
|
||||
try:
|
||||
recipient = get_phone_number_from_s3(
|
||||
notification.service_id,
|
||||
notification.job_id,
|
||||
notification.job_row_number,
|
||||
)
|
||||
recipient = get_phone_number_from_s3(
|
||||
notification.service_id,
|
||||
notification.job_id,
|
||||
notification.job_row_number,
|
||||
)
|
||||
|
||||
notification.to = recipient
|
||||
notification.normalised_to = recipient
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
notification.to = ""
|
||||
notification.normalised_to = ""
|
||||
else:
|
||||
raise ex
|
||||
notification.to = recipient
|
||||
notification.normalised_to = recipient
|
||||
|
||||
else:
|
||||
notification.to = "1"
|
||||
notification.normalised_to = "1"
|
||||
notification.to = ""
|
||||
notification.normalised_to = ""
|
||||
|
||||
kwargs = request.args.to_dict()
|
||||
kwargs["service_id"] = service_id
|
||||
|
||||
Reference in New Issue
Block a user