2024-08-27 10:31:58 -07:00
|
|
|
import datetime
|
2024-01-08 14:31:28 -08:00
|
|
|
import re
|
2024-08-13 14:51:51 -07:00
|
|
|
import time
|
2024-09-26 11:56:39 -07:00
|
|
|
from multiprocessing import Manager
|
2024-01-08 14:31:28 -08:00
|
|
|
|
2018-07-12 16:53:10 +01:00
|
|
|
import botocore
|
2023-03-03 16:01:12 -05:00
|
|
|
from boto3 import Session
|
2021-03-10 13:55:06 +00:00
|
|
|
from flask import current_app
|
2017-06-12 15:55:05 +01:00
|
|
|
|
2023-08-10 18:02:45 -04:00
|
|
|
from app.clients import AWS_CLIENT_CONFIG
|
2024-08-27 10:31:58 -07:00
|
|
|
from notifications_utils import aware_utcnow
|
2023-08-10 18:02:45 -04:00
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"
|
2016-02-24 17:12:30 +00:00
|
|
|
|
2024-05-13 07:19:14 -07:00
|
|
|
# Temporarily extend cache to 7 days
|
|
|
|
|
ttl = 60 * 60 * 24 * 7
|
2024-09-26 11:56:39 -07:00
|
|
|
manager = Manager()
|
|
|
|
|
job_cache = manager.dict()
|
2024-01-18 13:54:23 -08:00
|
|
|
|
2024-01-05 10:35:14 -08:00
|
|
|
|
2024-08-16 11:10:10 -07:00
|
|
|
# Global variable
|
|
|
|
|
s3_client = None
|
|
|
|
|
s3_resource = None
|
|
|
|
|
|
|
|
|
|
|
2024-09-26 11:56:39 -07:00
|
|
|
def set_job_cache(job_cache, key, value):
|
|
|
|
|
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_cache():
|
|
|
|
|
current_time = time.time()
|
|
|
|
|
keys_to_delete = []
|
|
|
|
|
for key, (_, expiry_time) in job_cache.items():
|
|
|
|
|
if expiry_time < current_time:
|
|
|
|
|
keys_to_delete.append(key)
|
|
|
|
|
|
|
|
|
|
for key in keys_to_delete:
|
|
|
|
|
del job_cache[key]
|
|
|
|
|
|
|
|
|
|
|
2024-08-16 11:10:10 -07:00
|
|
|
def get_s3_client():
|
|
|
|
|
global s3_client
|
|
|
|
|
if s3_client is None:
|
|
|
|
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
|
|
|
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
|
|
|
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
|
|
|
|
session = Session(
|
|
|
|
|
aws_access_key_id=access_key,
|
|
|
|
|
aws_secret_access_key=secret_key,
|
|
|
|
|
region_name=region,
|
|
|
|
|
)
|
|
|
|
|
s3_client = session.client("s3")
|
|
|
|
|
return s3_client
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_s3_resource():
|
|
|
|
|
global s3_resource
|
|
|
|
|
if s3_resource is None:
|
|
|
|
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
|
|
|
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
|
|
|
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
|
|
|
|
session = Session(
|
|
|
|
|
aws_access_key_id=access_key,
|
|
|
|
|
aws_secret_access_key=secret_key,
|
|
|
|
|
region_name=region,
|
|
|
|
|
)
|
|
|
|
|
s3_resource = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
|
|
|
|
return s3_resource
|
|
|
|
|
|
2024-01-05 10:35:14 -08:00
|
|
|
|
2024-07-19 13:58:23 -07:00
|
|
|
def list_s3_objects():
|
|
|
|
|
|
2024-08-16 11:10:10 -07:00
|
|
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
|
|
|
|
s3_client = get_s3_client()
|
2024-08-27 10:31:58 -07:00
|
|
|
# Our reports only support 7 days, but pull 8 days to avoid
|
|
|
|
|
# any edge cases
|
|
|
|
|
time_limit = aware_utcnow() - datetime.timedelta(days=8)
|
2024-07-19 13:58:23 -07:00
|
|
|
try:
|
2024-08-16 11:10:10 -07:00
|
|
|
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
2024-07-19 13:58:23 -07:00
|
|
|
while True:
|
|
|
|
|
for obj in response.get("Contents", []):
|
2024-08-27 10:31:58 -07:00
|
|
|
if obj["LastModified"] >= time_limit:
|
|
|
|
|
yield obj["Key"]
|
2024-07-19 13:58:23 -07:00
|
|
|
if "NextContinuationToken" in response:
|
2024-08-16 11:10:10 -07:00
|
|
|
response = s3_client.list_objects_v2(
|
2024-07-19 13:58:23 -07:00
|
|
|
Bucket=bucket_name,
|
|
|
|
|
ContinuationToken=response["NextContinuationToken"],
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
break
|
2024-08-15 10:40:26 -07:00
|
|
|
except Exception:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-08-15 10:44:25 -07:00
|
|
|
"An error occurred while regenerating cache #notify-admin-1200",
|
2024-07-22 07:21:03 -07:00
|
|
|
)
|
2024-07-22 10:05:21 -07:00
|
|
|
|
2024-07-19 13:58:23 -07:00
|
|
|
|
2024-09-10 11:12:43 -07:00
|
|
|
def get_bucket_name():
|
|
|
|
|
return current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
|
|
|
|
|
|
|
|
|
|
2024-09-06 11:13:13 -07:00
|
|
|
def cleanup_old_s3_objects():
|
2024-09-10 11:12:43 -07:00
|
|
|
bucket_name = get_bucket_name()
|
|
|
|
|
|
2024-09-06 11:13:13 -07:00
|
|
|
s3_client = get_s3_client()
|
|
|
|
|
# Our reports only support 7 days, but can be scheduled 3 days in advance
|
|
|
|
|
# Use 14 day for the v1.0 version of this behavior
|
|
|
|
|
time_limit = aware_utcnow() - datetime.timedelta(days=14)
|
|
|
|
|
try:
|
|
|
|
|
response = s3_client.list_objects_v2(Bucket=bucket_name)
|
|
|
|
|
while True:
|
|
|
|
|
for obj in response.get("Contents", []):
|
2024-09-06 13:15:38 -07:00
|
|
|
if obj["LastModified"] <= time_limit:
|
2024-09-11 07:31:50 -07:00
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
remove_csv_object(obj["Key"])
|
|
|
|
|
current_app.logger.info(
|
|
|
|
|
f"#delete-old-s3-objects Deleted: {obj['LastModified']} {obj['Key']}"
|
|
|
|
|
)
|
|
|
|
|
except botocore.exceptions.ClientError:
|
|
|
|
|
current_app.logger.exception(f"Couldn't delete {obj['Key']}")
|
|
|
|
|
|
2024-09-06 11:13:13 -07:00
|
|
|
if "NextContinuationToken" in response:
|
|
|
|
|
response = s3_client.list_objects_v2(
|
|
|
|
|
Bucket=bucket_name,
|
|
|
|
|
ContinuationToken=response["NextContinuationToken"],
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
break
|
|
|
|
|
except Exception:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-09-06 13:15:38 -07:00
|
|
|
"#delete-old-s3-objects An error occurred while cleaning up old s3 objects",
|
2024-07-22 07:21:03 -07:00
|
|
|
)
|
2024-07-22 10:05:21 -07:00
|
|
|
|
2024-07-19 13:58:23 -07:00
|
|
|
|
|
|
|
|
def get_s3_files():
|
2024-08-16 11:10:10 -07:00
|
|
|
|
2024-07-19 13:58:23 -07:00
|
|
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
|
|
|
|
objects = list_s3_objects()
|
|
|
|
|
|
2024-08-16 11:10:10 -07:00
|
|
|
s3res = get_s3_resource()
|
2024-07-22 07:21:03 -07:00
|
|
|
current_app.logger.info(
|
2024-09-26 11:56:39 -07:00
|
|
|
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
|
2024-07-22 07:21:03 -07:00
|
|
|
)
|
2024-07-19 13:58:23 -07:00
|
|
|
for object in objects:
|
2024-07-22 09:11:21 -07:00
|
|
|
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
|
|
|
|
|
try:
|
|
|
|
|
object_arr = object.split("/")
|
|
|
|
|
job_id = object_arr[1] # get the job_id
|
|
|
|
|
job_id = job_id.replace(".csv", "") # we just want the job_id
|
2024-09-26 11:56:39 -07:00
|
|
|
if job_cache.get(job_id) is None:
|
2024-07-22 09:11:21 -07:00
|
|
|
object = (
|
|
|
|
|
s3res.Object(bucket_name, object)
|
|
|
|
|
.get()["Body"]
|
|
|
|
|
.read()
|
|
|
|
|
.decode("utf-8")
|
|
|
|
|
)
|
|
|
|
|
if "phone number" in object.lower():
|
2024-09-26 11:56:39 -07:00
|
|
|
set_job_cache(job_cache, job_id, object)
|
2024-08-15 10:44:25 -07:00
|
|
|
except LookupError:
|
2024-07-22 09:11:21 -07:00
|
|
|
# perhaps our key is not formatted as we expected. If so skip it.
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception("LookupError #notify-admin-1200")
|
2024-07-22 09:11:21 -07:00
|
|
|
|
2024-07-22 07:21:03 -07:00
|
|
|
current_app.logger.info(
|
2024-09-26 11:56:39 -07:00
|
|
|
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
|
2024-07-22 07:21:03 -07:00
|
|
|
)
|
2024-07-19 13:58:23 -07:00
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
|
2022-09-26 10:56:59 -04:00
|
|
|
s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region)
|
2023-08-29 14:54:30 -07:00
|
|
|
return s3_file.get()["Body"].read().decode("utf-8")
|
2017-05-12 17:39:15 +01:00
|
|
|
|
|
|
|
|
|
2024-08-01 08:05:12 -07:00
|
|
|
def download_from_s3(
|
|
|
|
|
bucket_name, s3_key, local_filename, access_key, secret_key, region
|
|
|
|
|
):
|
2024-08-16 11:10:10 -07:00
|
|
|
|
|
|
|
|
s3 = get_s3_client()
|
2024-08-01 09:23:26 -07:00
|
|
|
result = None
|
2024-08-01 08:05:12 -07:00
|
|
|
try:
|
2024-08-01 09:23:26 -07:00
|
|
|
result = s3.download_file(bucket_name, s3_key, local_filename)
|
2024-08-13 14:51:51 -07:00
|
|
|
current_app.logger.info(f"File downloaded successfully to {local_filename}")
|
2024-08-01 09:23:26 -07:00
|
|
|
except botocore.exceptions.NoCredentialsError as nce:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception("Credentials not found")
|
2024-08-01 09:23:26 -07:00
|
|
|
raise Exception(nce)
|
|
|
|
|
except botocore.exceptions.PartialCredentialsError as pce:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception("Incomplete credentials provided")
|
2024-08-01 09:23:26 -07:00
|
|
|
raise Exception(pce)
|
2024-08-15 10:40:26 -07:00
|
|
|
except Exception:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception("An error occurred")
|
2024-08-15 10:31:02 -07:00
|
|
|
text = f"EXCEPTION local_filename {local_filename}"
|
2024-08-01 09:37:49 -07:00
|
|
|
raise Exception(text)
|
2024-08-01 09:23:26 -07:00
|
|
|
return result
|
2024-08-01 08:05:12 -07:00
|
|
|
|
|
|
|
|
|
2023-08-29 14:54:30 -07:00
|
|
|
def get_s3_object(bucket_name, file_location, access_key, secret_key, region):
|
2024-08-16 11:10:10 -07:00
|
|
|
|
|
|
|
|
s3 = get_s3_resource()
|
|
|
|
|
try:
|
|
|
|
|
return s3.Object(bucket_name, file_location)
|
|
|
|
|
except botocore.exceptions.ClientError:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
|
|
|
|
f"Can't retrieve S3 Object from {file_location}",
|
2024-08-16 11:10:10 -07:00
|
|
|
)
|
2016-04-05 14:28:19 +01:00
|
|
|
|
|
|
|
|
|
2023-11-09 08:46:53 -08:00
|
|
|
def purge_bucket(bucket_name, access_key, secret_key, region):
|
2024-08-16 11:10:10 -07:00
|
|
|
s3 = get_s3_resource()
|
2023-11-09 08:46:53 -08:00
|
|
|
bucket = s3.Bucket(bucket_name)
|
|
|
|
|
bucket.objects.all().delete()
|
|
|
|
|
|
|
|
|
|
|
2024-08-16 11:10:10 -07:00
|
|
|
def file_exists(file_location):
|
|
|
|
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
|
|
|
|
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
|
|
|
|
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
|
|
|
|
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
|
|
|
|
|
2018-07-12 16:53:10 +01:00
|
|
|
try:
|
|
|
|
|
# try and access metadata of object
|
2023-08-29 14:54:30 -07:00
|
|
|
get_s3_object(
|
|
|
|
|
bucket_name, file_location, access_key, secret_key, region
|
|
|
|
|
).metadata
|
2018-07-12 16:53:10 +01:00
|
|
|
return True
|
|
|
|
|
except botocore.exceptions.ClientError as e:
|
2023-08-29 14:54:30 -07:00
|
|
|
if e.response["ResponseMetadata"]["HTTPStatusCode"] == 404:
|
2018-07-12 16:53:10 +01:00
|
|
|
return False
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
2018-04-30 11:47:13 +01:00
|
|
|
def get_job_location(service_id, job_id):
|
|
|
|
|
return (
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
|
2018-04-30 11:47:13 +01:00
|
|
|
FILE_LOCATION_STRUCTURE.format(service_id, job_id),
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
|
|
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
|
|
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
|
2018-04-30 11:47:13 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2019-11-08 10:30:26 +00:00
|
|
|
def get_job_and_metadata_from_s3(service_id, job_id):
|
|
|
|
|
obj = get_s3_object(*get_job_location(service_id, job_id))
|
2023-08-29 14:54:30 -07:00
|
|
|
return obj.get()["Body"].read().decode("utf-8"), obj.get()["Metadata"]
|
2019-11-08 10:30:26 +00:00
|
|
|
|
|
|
|
|
|
2016-04-07 13:44:04 +01:00
|
|
|
def get_job_from_s3(service_id, job_id):
|
2024-08-16 11:10:10 -07:00
|
|
|
"""
|
|
|
|
|
If and only if we hit a throttling exception of some kind, we want to try
|
|
|
|
|
exponential backoff. However, if we are getting NoSuchKey or something
|
|
|
|
|
that indicates things are permanently broken, we want to give up right away
|
|
|
|
|
to save time.
|
|
|
|
|
"""
|
2024-08-16 09:53:29 -07:00
|
|
|
# We have to make sure the retries don't take up to much time, because
|
|
|
|
|
# we might be retrieving dozens of jobs. So max time is:
|
|
|
|
|
# 0.2 + 0.4 + 0.8 + 1.6 = 3.0 seconds
|
2024-08-13 14:51:51 -07:00
|
|
|
retries = 0
|
2024-08-16 09:53:29 -07:00
|
|
|
max_retries = 4
|
|
|
|
|
backoff_factor = 0.2
|
2024-08-16 11:10:10 -07:00
|
|
|
|
|
|
|
|
if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
|
|
|
|
|
current_app.logger.error(
|
|
|
|
|
f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
|
|
|
|
|
)
|
|
|
|
|
return None
|
|
|
|
|
|
2024-08-13 14:51:51 -07:00
|
|
|
while retries < max_retries:
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
obj = get_s3_object(*get_job_location(service_id, job_id))
|
|
|
|
|
return obj.get()["Body"].read().decode("utf-8")
|
|
|
|
|
except botocore.exceptions.ClientError as e:
|
|
|
|
|
if e.response["Error"]["Code"] in [
|
|
|
|
|
"Throttling",
|
|
|
|
|
"RequestTimeout",
|
|
|
|
|
"SlowDown",
|
|
|
|
|
]:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-08-16 11:10:10 -07:00
|
|
|
f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
|
|
|
|
)
|
2024-08-13 14:51:51 -07:00
|
|
|
retries += 1
|
|
|
|
|
sleep_time = backoff_factor * (2**retries) # Exponential backoff
|
|
|
|
|
time.sleep(sleep_time)
|
|
|
|
|
continue
|
2024-08-16 11:10:10 -07:00
|
|
|
else:
|
|
|
|
|
# Typically this is "NoSuchKey"
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-08-16 11:10:10 -07:00
|
|
|
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
|
|
|
|
)
|
|
|
|
|
return None
|
2024-08-16 09:53:29 -07:00
|
|
|
|
2024-08-15 10:40:26 -07:00
|
|
|
except Exception:
|
2024-09-11 09:39:18 -07:00
|
|
|
current_app.logger.exception(
|
2024-08-16 09:53:29 -07:00
|
|
|
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
|
2024-08-15 17:55:59 -07:00
|
|
|
)
|
2024-08-16 11:10:10 -07:00
|
|
|
return None
|
2024-08-13 14:51:51 -07:00
|
|
|
|
2024-08-16 09:53:29 -07:00
|
|
|
current_app.logger.error(
|
2024-08-16 10:24:10 -07:00
|
|
|
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
|
2024-08-16 09:53:29 -07:00
|
|
|
)
|
|
|
|
|
return None
|
2016-04-05 14:28:19 +01:00
|
|
|
|
|
|
|
|
|
2024-01-18 13:54:23 -08:00
|
|
|
def extract_phones(job):
|
2024-09-26 13:18:14 -07:00
|
|
|
job = job.split("\r\n")
|
2024-01-18 13:54:23 -08:00
|
|
|
first_row = job[0]
|
|
|
|
|
job.pop(0)
|
|
|
|
|
first_row = first_row.split(",")
|
|
|
|
|
phone_index = 0
|
|
|
|
|
for item in first_row:
|
2024-02-01 12:01:29 -08:00
|
|
|
# Note: may contain a BOM and look like \ufeffphone number
|
|
|
|
|
if "phone number" in item.lower():
|
2024-01-18 13:54:23 -08:00
|
|
|
break
|
|
|
|
|
phone_index = phone_index + 1
|
2024-02-01 12:01:29 -08:00
|
|
|
|
2024-01-18 13:54:23 -08:00
|
|
|
phones = {}
|
|
|
|
|
job_row = 0
|
|
|
|
|
for row in job:
|
|
|
|
|
row = row.split(",")
|
2024-03-15 14:26:18 -04:00
|
|
|
|
2024-01-31 08:10:48 -08:00
|
|
|
if phone_index >= len(row):
|
2024-02-23 11:07:13 -08:00
|
|
|
phones[job_row] = "Unavailable"
|
2024-02-01 10:48:59 -08:00
|
|
|
current_app.logger.error(
|
2024-08-15 10:31:02 -07:00
|
|
|
"Corrupt csv file, missing columns or possibly a byte order mark in the file",
|
2024-02-01 10:48:59 -08:00
|
|
|
)
|
2024-02-02 10:44:29 -08:00
|
|
|
|
2024-01-31 08:06:09 -08:00
|
|
|
else:
|
|
|
|
|
my_phone = row[phone_index]
|
|
|
|
|
my_phone = re.sub(r"[\+\s\(\)\-\.]*", "", my_phone)
|
|
|
|
|
phones[job_row] = my_phone
|
2024-01-18 13:54:23 -08:00
|
|
|
job_row = job_row + 1
|
|
|
|
|
return phones
|
2024-01-12 07:30:19 -08:00
|
|
|
|
|
|
|
|
|
2024-01-23 10:41:34 -08:00
|
|
|
def extract_personalisation(job):
|
2024-09-26 13:18:14 -07:00
|
|
|
|
2024-09-26 11:56:39 -07:00
|
|
|
job = job[0].split("\r\n")
|
2024-01-23 10:41:34 -08:00
|
|
|
first_row = job[0]
|
|
|
|
|
job.pop(0)
|
|
|
|
|
first_row = first_row.split(",")
|
|
|
|
|
personalisation = {}
|
|
|
|
|
job_row = 0
|
|
|
|
|
for row in job:
|
|
|
|
|
row = row.split(",")
|
|
|
|
|
temp = dict(zip(first_row, row))
|
|
|
|
|
personalisation[job_row] = temp
|
|
|
|
|
job_row = job_row + 1
|
|
|
|
|
return personalisation
|
|
|
|
|
|
|
|
|
|
|
2024-01-05 10:35:14 -08:00
|
|
|
def get_phone_number_from_s3(service_id, job_id, job_row_number):
|
2024-09-26 11:56:39 -07:00
|
|
|
job = job_cache.get(job_id)
|
2024-01-05 10:35:14 -08:00
|
|
|
if job is None:
|
2024-08-13 14:51:51 -07:00
|
|
|
current_app.logger.info(f"job {job_id} was not in the cache")
|
2024-01-05 10:35:14 -08:00
|
|
|
job = get_job_from_s3(service_id, job_id)
|
2024-08-16 10:24:10 -07:00
|
|
|
# Even if it is None, put it here to avoid KeyErrors
|
2024-09-26 11:56:39 -07:00
|
|
|
set_job_cache(job_cache, job_id, job)
|
2024-09-26 13:18:14 -07:00
|
|
|
else:
|
|
|
|
|
# skip expiration date from cache, we don't need it here
|
|
|
|
|
job = job[0]
|
2024-01-12 07:30:19 -08:00
|
|
|
|
2024-01-18 13:54:23 -08:00
|
|
|
if job is None:
|
2024-08-16 10:24:10 -07:00
|
|
|
current_app.logger.error(
|
|
|
|
|
f"Couldnt find phone for job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} because job is missing"
|
2024-01-18 13:54:23 -08:00
|
|
|
)
|
2024-02-23 11:07:13 -08:00
|
|
|
return "Unavailable"
|
2024-01-05 10:35:14 -08:00
|
|
|
|
2024-01-18 15:12:52 -08:00
|
|
|
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
|
|
|
|
|
# and that dictionary is not there, create it
|
2024-09-26 11:56:39 -07:00
|
|
|
if job_cache.get(f"{job_id}_phones") is None:
|
2024-09-26 13:18:14 -07:00
|
|
|
phones = extract_phones(job)
|
|
|
|
|
set_job_cache(job_cache, f"{job_id}_phones", phones)
|
2024-01-18 13:54:23 -08:00
|
|
|
|
2024-01-18 15:12:52 -08:00
|
|
|
# If we can find the quick dictionary, use it
|
2024-09-26 13:18:14 -07:00
|
|
|
phone_to_return = phones[job_row_number]
|
|
|
|
|
if phone_to_return:
|
|
|
|
|
return phone_to_return
|
2024-01-19 09:02:44 -08:00
|
|
|
else:
|
2024-09-26 13:18:14 -07:00
|
|
|
current_app.logger.warning(
|
|
|
|
|
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
|
2024-01-19 09:02:44 -08:00
|
|
|
)
|
2024-02-23 11:07:13 -08:00
|
|
|
return "Unavailable"
|
2024-01-05 10:35:14 -08:00
|
|
|
|
|
|
|
|
|
2024-01-18 10:03:35 -08:00
|
|
|
def get_personalisation_from_s3(service_id, job_id, job_row_number):
|
2024-01-23 10:41:34 -08:00
|
|
|
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
|
|
|
|
|
# At the same time we don't want to store it in redis or the db
|
|
|
|
|
# So this is a little recycling mechanism to reduce the number of downloads.
|
2024-09-26 11:56:39 -07:00
|
|
|
job = job_cache.get(job_id)
|
2024-01-18 10:03:35 -08:00
|
|
|
if job is None:
|
|
|
|
|
job = get_job_from_s3(service_id, job_id)
|
2024-09-26 11:56:39 -07:00
|
|
|
set_job_cache(job_cache, job_id, job)
|
2024-01-18 10:03:35 -08:00
|
|
|
|
2024-01-23 10:41:34 -08:00
|
|
|
# If the job is None after our attempt to retrieve it from s3, it
|
|
|
|
|
# probably means the job is old and has been deleted from s3, in
|
|
|
|
|
# which case there is nothing we can do. It's unlikely to run into
|
|
|
|
|
# this, but it could theoretically happen, especially if we ever
|
|
|
|
|
# change the task schedules
|
|
|
|
|
if job is None:
|
|
|
|
|
current_app.logger.warning(
|
2024-08-16 09:53:29 -07:00
|
|
|
f"Couldnt find personalisation for job_id {job_id} row number {job_row_number} because job is missing"
|
2024-01-23 10:41:34 -08:00
|
|
|
)
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
|
|
|
|
|
# and that dictionary is not there, create it
|
2024-09-26 11:56:39 -07:00
|
|
|
if job_cache.get(f"{job_id}_personalisation") is None:
|
|
|
|
|
set_job_cache(
|
|
|
|
|
job_cache, f"{job_id}_personalisation", extract_personalisation(job)
|
|
|
|
|
)
|
2024-01-23 10:41:34 -08:00
|
|
|
|
|
|
|
|
# If we can find the quick dictionary, use it
|
2024-09-26 11:56:39 -07:00
|
|
|
if job_cache.get(f"{job_id}_personalisation") is not None:
|
|
|
|
|
personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get(
|
2024-01-23 10:41:34 -08:00
|
|
|
job_row_number
|
|
|
|
|
)
|
|
|
|
|
if personalisation_to_return:
|
|
|
|
|
return personalisation_to_return
|
|
|
|
|
else:
|
|
|
|
|
current_app.logger.warning(
|
|
|
|
|
f"Was unable to retrieve personalisation from lookup dictionary for job {job_id}"
|
|
|
|
|
)
|
|
|
|
|
return {}
|
|
|
|
|
else:
|
|
|
|
|
current_app.logger.error(
|
|
|
|
|
f"Was unable to construct lookup dictionary for job {job_id}"
|
|
|
|
|
)
|
|
|
|
|
return {}
|
2024-01-18 10:03:35 -08:00
|
|
|
|
2024-01-05 10:35:14 -08:00
|
|
|
|
2018-04-30 11:47:13 +01:00
|
|
|
def get_job_metadata_from_s3(service_id, job_id):
|
|
|
|
|
obj = get_s3_object(*get_job_location(service_id, job_id))
|
2023-08-29 14:54:30 -07:00
|
|
|
return obj.get()["Metadata"]
|
2018-04-30 11:47:13 +01:00
|
|
|
|
|
|
|
|
|
2016-04-07 13:44:04 +01:00
|
|
|
def remove_job_from_s3(service_id, job_id):
|
2018-04-30 11:47:13 +01:00
|
|
|
return remove_s3_object(*get_job_location(service_id, job_id))
|
2017-06-12 15:55:05 +01:00
|
|
|
|
|
|
|
|
|
2022-09-26 10:56:59 -04:00
|
|
|
def remove_s3_object(bucket_name, object_key, access_key, secret_key, region):
|
|
|
|
|
obj = get_s3_object(bucket_name, object_key, access_key, secret_key, region)
|
2017-05-12 17:21:07 +01:00
|
|
|
return obj.delete()
|
2023-05-23 08:31:30 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def remove_csv_object(object_key):
|
|
|
|
|
obj = get_s3_object(
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
|
2023-05-23 08:31:30 -07:00
|
|
|
object_key,
|
2023-08-29 14:54:30 -07:00
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
|
|
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
|
|
|
|
|
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
|
2023-05-23 08:31:30 -07:00
|
|
|
)
|
|
|
|
|
return obj.delete()
|