mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 15:46:07 -05:00
add retry with backoff
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import re
|
||||
import time
|
||||
|
||||
import botocore
|
||||
from boto3 import Session
|
||||
@@ -107,15 +108,15 @@ def download_from_s3(
|
||||
result = None
|
||||
try:
|
||||
result = s3.download_file(bucket_name, s3_key, local_filename)
|
||||
print(f"File downloaded successfully to {local_filename}")
|
||||
current_app.logger.info(f"File downloaded successfully to {local_filename}")
|
||||
except botocore.exceptions.NoCredentialsError as nce:
|
||||
print("Credentials not found")
|
||||
current_app.logger.error("Credentials not found")
|
||||
raise Exception(nce)
|
||||
except botocore.exceptions.PartialCredentialsError as pce:
|
||||
print("Incomplete credentials provided")
|
||||
current_app.logger.error("Incomplete credentials provided")
|
||||
raise Exception(pce)
|
||||
except Exception as e:
|
||||
print(f"An error occurred {e}")
|
||||
current_app.logger.error(f"An error occurred {e}")
|
||||
text = f"EXCEPTION {e} local_filename {local_filename}"
|
||||
raise Exception(text)
|
||||
return result
|
||||
@@ -171,8 +172,29 @@ def get_job_and_metadata_from_s3(service_id, job_id):
|
||||
|
||||
|
||||
def get_job_from_s3(service_id, job_id):
|
||||
obj = get_s3_object(*get_job_location(service_id, job_id))
|
||||
return obj.get()["Body"].read().decode("utf-8")
|
||||
retries = 0
|
||||
max_retries = 5
|
||||
backoff_factor = 1
|
||||
while retries < max_retries:
|
||||
|
||||
try:
|
||||
obj = get_s3_object(*get_job_location(service_id, job_id))
|
||||
return obj.get()["Body"].read().decode("utf-8")
|
||||
except botocore.exceptions.ClientError as e:
|
||||
if e.response["Error"]["Code"] in [
|
||||
"Throttling",
|
||||
"RequestTimeout",
|
||||
"SlowDown",
|
||||
]:
|
||||
retries += 1
|
||||
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"Failed to get object from bucket {e}")
|
||||
raise
|
||||
|
||||
raise Exception(f"Failed to get object after 5 attempts")
|
||||
|
||||
|
||||
def incr_jobs_cache_misses():
|
||||
@@ -241,6 +263,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
|
||||
# So this is a little recycling mechanism to reduce the number of downloads.
|
||||
job = JOBS.get(job_id)
|
||||
if job is None:
|
||||
current_app.logger.info(f"job {job_id} was not in the cache")
|
||||
job = get_job_from_s3(service_id, job_id)
|
||||
JOBS[job_id] = job
|
||||
incr_jobs_cache_misses()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import json
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta
|
||||
from os import getenv, path
|
||||
|
||||
from celery.schedules import crontab
|
||||
@@ -165,6 +165,8 @@ class Config(object):
|
||||
# we only need real email in Live environment (production)
|
||||
DVLA_EMAIL_ADDRESSES = json.loads(getenv("DVLA_EMAIL_ADDRESSES", "[]"))
|
||||
|
||||
current_minute = (datetime.now().minute + 1) % 60
|
||||
|
||||
CELERY = {
|
||||
"broker_url": REDIS_URL,
|
||||
"broker_transport_options": {
|
||||
@@ -254,6 +256,16 @@ class Config(object):
|
||||
"schedule": crontab(minute="*/30"),
|
||||
"options": {"queue": QueueNames.PERIODIC},
|
||||
},
|
||||
"regenerate-job-cache-on-startup": {
|
||||
"task": "regenerate-job-cache",
|
||||
"schedule": crontab(
|
||||
minute=current_minute
|
||||
), # Runs once at the next minute
|
||||
"options": {
|
||||
"queue": QueueNames.PERIODIC,
|
||||
"expires": 60,
|
||||
}, # Ensure it doesn't run if missed
|
||||
},
|
||||
"cleanup-unfinished-jobs": {
|
||||
"task": "cleanup-unfinished-jobs",
|
||||
"schedule": crontab(hour=4, minute=5),
|
||||
|
||||
@@ -511,11 +511,6 @@ def get_all_notifications_for_service(service_id):
|
||||
)
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
s = notification.service_id
|
||||
j = notification.job_id
|
||||
current_app.logger.warning(
|
||||
f"No personalisation found for s3 file location service: service-{s}-notify/{j}.csv"
|
||||
)
|
||||
notification.personalisation = ""
|
||||
else:
|
||||
raise ex
|
||||
@@ -531,11 +526,6 @@ def get_all_notifications_for_service(service_id):
|
||||
notification.normalised_to = recipient
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
s = notification.service_id
|
||||
j = notification.job_id
|
||||
current_app.logger.warning(
|
||||
f"No phone number found for s3 file location service: service-{s}-notify/{j}.csv"
|
||||
)
|
||||
notification.to = ""
|
||||
notification.normalised_to = ""
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user