merge from main

This commit is contained in:
Kenneth Kehl
2024-09-27 09:43:10 -07:00
8 changed files with 98 additions and 75 deletions

View File

@@ -2,13 +2,12 @@ import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager
import botocore
from boto3 import Session
from expiringdict import ExpiringDict
from flask import current_app
from app import redis_store
from app.clients import AWS_CLIENT_CONFIG
from notifications_utils import aware_utcnow
@@ -16,17 +15,30 @@ FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"
# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
manager = Manager()
job_cache = manager.dict()
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
# Global variable
s3_client = None
s3_resource = None
def set_job_cache(job_cache, key, value):
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
def clean_cache():
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
if expiry_time < current_time:
keys_to_delete.append(key)
for key in keys_to_delete:
del job_cache[key]
def get_s3_client():
global s3_client
if s3_client is None:
@@ -88,7 +100,6 @@ def get_bucket_name():
def cleanup_old_s3_objects():
bucket_name = get_bucket_name()
s3_client = get_s3_client()
@@ -100,9 +111,15 @@ def cleanup_old_s3_objects():
while True:
for obj in response.get("Contents", []):
if obj["LastModified"] <= time_limit:
current_app.logger.info(
f"#delete-old-s3-objects Wanting to delete: {obj['LastModified']} {obj['Key']}"
)
try:
remove_csv_object(obj["Key"])
current_app.logger.info(
f"#delete-old-s3-objects Deleted: {obj['LastModified']} {obj['Key']}"
)
except botocore.exceptions.ClientError:
current_app.logger.exception(f"Couldn't delete {obj['Key']}")
if "NextContinuationToken" in response:
response = s3_client.list_objects_v2(
Bucket=bucket_name,
@@ -131,8 +148,8 @@ def read_s3_file(bucket_name, object_key, s3res):
putting a list of all phone numbers into the cache as well.
This means that when the report needs to be regenerated, it
can easily find the phone numbers in the cache through JOBS[<job_id>_phones]
and the personalization through JOBS[<job_id>_personalisation], which
can easily find the phone numbers in the cache through job_cache[<job_id>_phones]
and the personalization through job_cache[<job_id>_personalisation], which
in theory should make report generation a lot faster.
We are moving processing from the front end where the user can see it
@@ -140,7 +157,7 @@ def read_s3_file(bucket_name, object_key, s3res):
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
if JOBS.get(job_id) is None:
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
@@ -148,9 +165,13 @@ def read_s3_file(bucket_name, object_key, s3res):
.decode("utf-8")
)
if "phone number" in object.lower():
JOBS[job_id] = object
JOBS[f"{job_id}_phones"] = extract_phones(object)
JOBS[f"{job_id}_personalisation"] = extract_personalisation(object)
set_job_cache(job_cache, job_id, object)
set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object))
set_job_cache(
job_cache,
f"{job_id}_personalisation",
extract_personalisation(object),
)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
@@ -167,13 +188,13 @@ def get_s3_files():
s3res = get_s3_resource()
current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
)
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
current_app.logger.info(
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
)
@@ -311,20 +332,6 @@ def get_job_from_s3(service_id, job_id):
return None
def incr_jobs_cache_misses():
if not redis_store.get(JOBS_CACHE_MISSES):
redis_store.set(JOBS_CACHE_MISSES, 1)
else:
redis_store.incr(JOBS_CACHE_MISSES)
def incr_jobs_cache_hits():
if not redis_store.get(JOBS_CACHE_HITS):
redis_store.set(JOBS_CACHE_HITS, 1)
else:
redis_store.incr(JOBS_CACHE_HITS)
def extract_phones(job):
job = job.split("\r\n")
first_row = job[0]
@@ -333,7 +340,7 @@ def extract_phones(job):
phone_index = 0
for item in first_row:
# Note: may contain a BOM and look like \ufeffphone number
if "phone number" in item.lower():
if item.lower() in ["phone number", "\\ufeffphone number"]:
break
phone_index = phone_index + 1
@@ -357,7 +364,8 @@ def extract_phones(job):
def extract_personalisation(job):
job = job.split("\r\n")
job = job[0].split("\r\n")
first_row = job[0]
job.pop(0)
first_row = first_row.split(",")
@@ -372,18 +380,15 @@ def extract_personalisation(job):
def get_phone_number_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need a phone number.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
JOBS[job_id] = job
incr_jobs_cache_misses()
set_job_cache(job_cache, job_id, job)
else:
incr_jobs_cache_hits()
# skip expiration date from cache, we don't need it here
job = job[0]
if job is None:
current_app.logger.error(
@@ -391,24 +396,19 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
)
return "Unavailable"
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# If we look in the job_cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_phones") is None:
JOBS[f"{job_id}_phones"] = extract_phones(job)
if job_cache.get(f"{job_id}_phones") is None:
phones = extract_phones(job)
set_job_cache(job_cache, f"{job_id}_phones", phones)
# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_phones") is not None:
phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number)
if phone_to_return:
return phone_to_return
else:
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"
phone_to_return = phones[job_row_number]
if phone_to_return:
return phone_to_return
else:
current_app.logger.error(
f"Was unable to construct lookup dictionary for job {job_id}"
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"
@@ -417,13 +417,10 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
set_job_cache(job_cache, job_id, job)
# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
@@ -436,14 +433,16 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# If we look in the job_cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_personalisation") is None:
JOBS[f"{job_id}_personalisation"] = extract_personalisation(job)
if job_cache.get(f"{job_id}_personalisation") is None:
set_job_cache(
job_cache, f"{job_id}_personalisation", extract_personalisation(job)
)
# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get(
if job_cache.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get(
job_row_number
)
if personalisation_to_return:

View File

@@ -446,6 +446,11 @@ def regenerate_job_cache():
s3.get_s3_files()
@notify_celery.task(name="clean-job-cache")
def clean_job_cache():
s3.clean_cache()
@notify_celery.task(name="delete-old-s3-objects")
def delete_old_s3_objects():
s3.cleanup_old_s3_objects()

View File

@@ -251,12 +251,12 @@ class Config(object):
},
"delete_old_s3_objects": {
"task": "delete-old-s3-objects",
"schedule": crontab(minute="*/5"),
"schedule": crontab(hour=7, minute=10),
"options": {"queue": QueueNames.PERIODIC},
},
"regenerate-job-cache": {
"task": "regenerate-job-cache",
"schedule": crontab(minute="*/3"),
"schedule": crontab(minute="*/30"),
"options": {"queue": QueueNames.PERIODIC},
},
"regenerate-job-cache-on-startup": {
@@ -269,6 +269,11 @@ class Config(object):
"expires": 60,
}, # Ensure it doesn't run if missed
},
"clean-job-cache": {
"task": "clean-job-cache",
"schedule": crontab(hour=2, minute=11),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-unfinished-jobs": {
"task": "cleanup-unfinished-jobs",
"schedule": crontab(hour=4, minute=5),