Files
notifications-api/app/aws/s3.py

388 lines
13 KiB
Python
Raw Normal View History

2024-01-08 14:31:28 -08:00
import re
2024-08-13 14:51:51 -07:00
import time
2024-01-08 14:31:28 -08:00
import botocore
2023-03-03 16:01:12 -05:00
from boto3 import Session
2024-01-05 10:35:14 -08:00
from expiringdict import ExpiringDict
2021-03-10 13:55:06 +00:00
from flask import current_app
from app import redis_store
from app.clients import AWS_CLIENT_CONFIG
2023-08-29 14:54:30 -07:00
FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"
2024-05-13 07:19:14 -07:00
# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
2024-01-18 13:54:23 -08:00
2024-01-05 10:35:14 -08:00
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
2024-01-05 10:35:14 -08:00
2024-07-19 13:58:23 -07:00
def list_s3_objects():
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.client("s3")
try:
response = s3.list_objects_v2(Bucket=bucket_name)
while True:
for obj in response.get("Contents", []):
2024-07-22 10:05:21 -07:00
yield obj["Key"]
2024-07-19 13:58:23 -07:00
if "NextContinuationToken" in response:
response = s3.list_objects_v2(
Bucket=bucket_name,
ContinuationToken=response["NextContinuationToken"],
)
else:
break
except Exception as e:
2024-07-22 07:21:03 -07:00
current_app.logger.error(
f"An error occurred while regenerating cache #notify-admin-1200 {e}"
)
2024-07-22 10:05:21 -07:00
2024-07-19 13:58:23 -07:00
def get_s3_files():
2024-07-19 15:02:54 -07:00
current_app.logger.info("Regenerate job cache #notify-admin-1200")
2024-07-19 13:58:23 -07:00
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
objects = list_s3_objects()
s3res = session.resource("s3", config=AWS_CLIENT_CONFIG)
2024-07-22 07:21:03 -07:00
current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
)
2024-07-19 13:58:23 -07:00
for object in objects:
2024-07-22 09:11:21 -07:00
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
try:
object_arr = object.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
if JOBS.get(job_id) is None:
object = (
s3res.Object(bucket_name, object)
.get()["Body"]
.read()
.decode("utf-8")
)
if "phone number" in object.lower():
JOBS[job_id] = object
except LookupError as le:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.error(f"LookupError {le} #notify-admin-1200")
2024-07-22 07:21:03 -07:00
current_app.logger.info(
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
)
2024-07-19 13:58:23 -07:00
2023-08-29 14:54:30 -07:00
def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region)
2023-08-29 14:54:30 -07:00
return s3_file.get()["Body"].read().decode("utf-8")
2024-08-01 08:05:12 -07:00
def download_from_s3(
bucket_name, s3_key, local_filename, access_key, secret_key, region
):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.client("s3", config=AWS_CLIENT_CONFIG)
2024-08-01 09:23:26 -07:00
result = None
2024-08-01 08:05:12 -07:00
try:
2024-08-01 09:23:26 -07:00
result = s3.download_file(bucket_name, s3_key, local_filename)
2024-08-13 14:51:51 -07:00
current_app.logger.info(f"File downloaded successfully to {local_filename}")
2024-08-01 09:23:26 -07:00
except botocore.exceptions.NoCredentialsError as nce:
2024-08-13 14:51:51 -07:00
current_app.logger.error("Credentials not found")
2024-08-01 09:23:26 -07:00
raise Exception(nce)
except botocore.exceptions.PartialCredentialsError as pce:
2024-08-13 14:51:51 -07:00
current_app.logger.error("Incomplete credentials provided")
2024-08-01 09:23:26 -07:00
raise Exception(pce)
2024-08-01 08:05:12 -07:00
except Exception as e:
2024-08-13 14:51:51 -07:00
current_app.logger.error(f"An error occurred {e}")
2024-08-01 09:37:49 -07:00
text = f"EXCEPTION {e} local_filename {local_filename}"
raise Exception(text)
2024-08-01 09:23:26 -07:00
return result
2024-08-01 08:05:12 -07:00
2023-08-29 14:54:30 -07:00
def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
2023-08-29 14:54:30 -07:00
region_name=region,
)
2023-08-29 14:54:30 -07:00
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
return s3.Object(bucket_name, file_location)
2023-11-09 08:46:53 -08:00
def purge_bucket(bucket_name, access_key, secret_key, region):
session = Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
)
s3 = session.resource("s3", config=AWS_CLIENT_CONFIG)
bucket = s3.Bucket(bucket_name)
bucket.objects.all().delete()
2023-08-29 14:54:30 -07:00
def file_exists(bucket_name, file_location, access_key, secret_key, region):
try:
# try and access metadata of object
2023-08-29 14:54:30 -07:00
get_s3_object(
bucket_name, file_location, access_key, secret_key, region
).metadata
return True
except botocore.exceptions.ClientError as e:
2023-08-29 14:54:30 -07:00
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
return False
raise
def get_job_location(service_id, job_id):
return (
2023-08-29 14:54:30 -07:00
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
FILE_LOCATION_STRUCTURE.format(service_id, job_id),
2023-08-29 14:54:30 -07:00
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)
2019-11-08 10:30:26 +00:00
def get_job_and_metadata_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
2023-08-29 14:54:30 -07:00
return obj.get()["Body"].read().decode("utf-8"), obj.get()["Metadata"]
2019-11-08 10:30:26 +00:00
def get_job_from_s3(service_id, job_id):
2024-08-16 09:53:29 -07:00
# We have to make sure the retries don't take up to much time, because
# we might be retrieving dozens of jobs. So max time is:
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
2024-08-13 14:51:51 -07:00
retries = 0
2024-08-16 09:53:29 -07:00
max_retries = 4
backoff_factor = 0.2
2024-08-13 14:51:51 -07:00
while retries < max_retries:
try:
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()["Body"].read().decode("utf-8")
except botocore.exceptions.ClientError as e:
2024-08-16 09:53:29 -07:00
current_app.logger.error(
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
exc_info=True,
)
retries += 1
sleep_time = backoff_factor * (2**retries) # Exponential backoff
time.sleep(sleep_time)
continue
2024-08-15 17:14:24 -07:00
except Exception:
2024-08-15 17:55:59 -07:00
current_app.logger.error(
2024-08-16 09:53:29 -07:00
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
2024-08-15 17:55:59 -07:00
exc_info=True,
)
2024-08-16 09:53:29 -07:00
retries += 1
sleep_time = backoff_factor * (2**retries) # Exponential backoff
time.sleep(sleep_time)
continue
2024-08-13 14:51:51 -07:00
2024-08-16 09:53:29 -07:00
current_app.logger.error(
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
)
return None
def incr_jobs_cache_misses():
if not redis_store.get(JOBS_CACHE_MISSES):
redis_store.set(JOBS_CACHE_MISSES, 1)
else:
redis_store.incr(JOBS_CACHE_MISSES)
def incr_jobs_cache_hits():
if not redis_store.get(JOBS_CACHE_HITS):
redis_store.set(JOBS_CACHE_HITS, 1)
else:
redis_store.incr(JOBS_CACHE_HITS)
2024-01-18 13:54:23 -08:00
def extract_phones(job):
job = job.split("\r\n")
first_row = job[0]
job.pop(0)
first_row = first_row.split(",")
phone_index = 0
for item in first_row:
2024-02-01 12:01:29 -08:00
# Note: may contain a BOM and look like \ufeffphone number
if "phone number" in item.lower():
2024-01-18 13:54:23 -08:00
break
phone_index = phone_index + 1
2024-02-01 12:01:29 -08:00
2024-01-18 13:54:23 -08:00
phones = {}
job_row = 0
for row in job:
row = row.split(",")
2024-01-31 08:10:48 -08:00
if phone_index >= len(row):
phones[job_row] = "Unavailable"
current_app.logger.error(
2024-02-02 10:29:01 -08:00
"Corrupt csv file, missing columns or possibly a byte order mark in the file"
)
2024-02-02 10:44:29 -08:00
2024-01-31 08:06:09 -08:00
else:
my_phone = row[phone_index]
my_phone = re.sub(r"[\+\s\(\)\-\.]*", "", my_phone)
phones[job_row] = my_phone
2024-01-18 13:54:23 -08:00
job_row = job_row + 1
return phones
2024-01-23 10:41:34 -08:00
def extract_personalisation(job):
job = job.split("\r\n")
first_row = job[0]
job.pop(0)
first_row = first_row.split(",")
personalisation = {}
job_row = 0
for row in job:
row = row.split(",")
temp = dict(zip(first_row, row))
personalisation[job_row] = temp
job_row = job_row + 1
return personalisation
2024-01-05 10:35:14 -08:00
def get_phone_number_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need a phone number.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
if job is None:
2024-08-13 14:51:51 -07:00
current_app.logger.info(f"job {job_id} was not in the cache")
2024-01-05 10:35:14 -08:00
job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
2024-01-18 15:12:52 -08:00
# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
# which case there is nothing we can do. It's unlikely to run into
# this, but it could theoretically happen, especially if we ever
# change the task schedules
2024-01-18 13:54:23 -08:00
if job is None:
current_app.logger.warning(
2024-01-31 08:53:54 -08:00
f"Couldnt find phone for job_id {job_id} row number {job_row_number} because job is missing"
2024-01-18 13:54:23 -08:00
)
return "Unavailable"
2024-01-05 10:35:14 -08:00
2024-01-18 15:12:52 -08:00
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
2024-01-18 13:54:23 -08:00
if JOBS.get(f"{job_id}_phones") is None:
JOBS[f"{job_id}_phones"] = extract_phones(job)
2024-01-18 15:12:52 -08:00
# If we can find the quick dictionary, use it
2024-01-18 13:54:23 -08:00
if JOBS.get(f"{job_id}_phones") is not None:
phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number)
if phone_to_return:
return phone_to_return
2024-01-18 15:12:52 -08:00
else:
current_app.logger.warning(
2024-01-19 09:02:44 -08:00
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
2024-01-18 15:12:52 -08:00
)
return "Unavailable"
2024-01-19 09:02:44 -08:00
else:
current_app.logger.error(
f"Was unable to construct lookup dictionary for job {job_id}"
)
return "Unavailable"
2024-01-05 10:35:14 -08:00
2024-01-18 10:03:35 -08:00
def get_personalisation_from_s3(service_id, job_id, job_row_number):
2024-01-23 10:41:34 -08:00
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
2024-01-18 10:03:35 -08:00
job = JOBS.get(job_id)
if job is None:
job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
2024-01-23 10:41:34 -08:00
# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
# which case there is nothing we can do. It's unlikely to run into
# this, but it could theoretically happen, especially if we ever
# change the task schedules
if job is None:
current_app.logger.warning(
2024-08-16 09:53:29 -07:00
f"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
2024-01-23 10:41:34 -08:00
)
return {}
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_personalisation") is None:
JOBS[f"{job_id}_personalisation"] = extract_personalisation(job)
# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get(
job_row_number
)
if personalisation_to_return:
return personalisation_to_return
else:
current_app.logger.warning(
f"Was unable to retrieve personalisation from lookup dictionary for job {job_id}"
)
return {}
else:
current_app.logger.error(
f"Was unable to construct lookup dictionary for job {job_id}"
)
return {}
2024-01-18 10:03:35 -08:00
2024-01-05 10:35:14 -08:00
def get_job_metadata_from_s3(service_id, job_id):
obj = get_s3_object(*get_job_location(service_id, job_id))
2023-08-29 14:54:30 -07:00
return obj.get()["Metadata"]
def remove_job_from_s3(service_id, job_id):
return remove_s3_object(*get_job_location(service_id, job_id))
def remove_s3_object(bucket_name, object_key, access_key, secret_key, region):
obj = get_s3_object(bucket_name, object_key, access_key, secret_key, region)
return obj.delete()
def remove_csv_object(object_key):
obj = get_s3_object(
2023-08-29 14:54:30 -07:00
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
object_key,
2023-08-29 14:54:30 -07:00
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)
return obj.delete()