use shared memory instead of expiring dict for jobs cache

This commit is contained in:
Kenneth Kehl
2024-09-26 11:56:39 -07:00
parent d27d877f68
commit 8aae0d59b4
3 changed files with 50 additions and 44 deletions

View File

@@ -1,31 +1,45 @@
import datetime import datetime
import re import re
import time import time
from multiprocessing import Manager
import botocore import botocore
from boto3 import Session from boto3 import Session
from expiringdict import ExpiringDict
from flask import current_app from flask import current_app
from app import redis_store
from app.clients import AWS_CLIENT_CONFIG from app.clients import AWS_CLIENT_CONFIG
from app.utils import hilite
from notifications_utils import aware_utcnow from notifications_utils import aware_utcnow
FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"
# Temporarily extend cache to 7 days # Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7 ttl = 60 * 60 * 24 * 7
JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl) manager = Manager()
job_cache = manager.dict()
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
# Global variable # Global variable
s3_client = None s3_client = None
s3_resource = None s3_resource = None
def set_job_cache(job_cache, key, value):
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
def clean_cache():
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
if expiry_time < current_time:
keys_to_delete.append(key)
for key in keys_to_delete:
print(hilite(f"DELETING {key}"))
del job_cache[key]
def get_s3_client(): def get_s3_client():
global s3_client global s3_client
if s3_client is None: if s3_client is None:
@@ -127,7 +141,7 @@ def get_s3_files():
s3res = get_s3_resource() s3res = get_s3_resource()
current_app.logger.info( current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
) )
for object in objects: for object in objects:
# We put our csv files in the format "service-{service_id}-notify/{job_id}" # We put our csv files in the format "service-{service_id}-notify/{job_id}"
@@ -135,7 +149,7 @@ def get_s3_files():
object_arr = object.split("/") object_arr = object.split("/")
job_id = object_arr[1] # get the job_id job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id job_id = job_id.replace(".csv", "") # we just want the job_id
if JOBS.get(job_id) is None: if job_cache.get(job_id) is None:
object = ( object = (
s3res.Object(bucket_name, object) s3res.Object(bucket_name, object)
.get()["Body"] .get()["Body"]
@@ -143,13 +157,13 @@ def get_s3_files():
.decode("utf-8") .decode("utf-8")
) )
if "phone number" in object.lower(): if "phone number" in object.lower():
JOBS[job_id] = object set_job_cache(job_cache, job_id, object)
except LookupError: except LookupError:
# perhaps our key is not formatted as we expected. If so skip it. # perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200") current_app.logger.exception("LookupError #notify-admin-1200")
current_app.logger.info( current_app.logger.info(
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
) )
@@ -287,22 +301,8 @@ def get_job_from_s3(service_id, job_id):
return None return None
def incr_jobs_cache_misses():
if not redis_store.get(JOBS_CACHE_MISSES):
redis_store.set(JOBS_CACHE_MISSES, 1)
else:
redis_store.incr(JOBS_CACHE_MISSES)
def incr_jobs_cache_hits():
if not redis_store.get(JOBS_CACHE_HITS):
redis_store.set(JOBS_CACHE_HITS, 1)
else:
redis_store.incr(JOBS_CACHE_HITS)
def extract_phones(job): def extract_phones(job):
job = job.split("\r\n") job = job[0].split("\r\n")
first_row = job[0] first_row = job[0]
job.pop(0) job.pop(0)
first_row = first_row.split(",") first_row = first_row.split(",")
@@ -333,7 +333,7 @@ def extract_phones(job):
def extract_personalisation(job): def extract_personalisation(job):
job = job.split("\r\n") job = job[0].split("\r\n")
first_row = job[0] first_row = job[0]
job.pop(0) job.pop(0)
first_row = first_row.split(",") first_row = first_row.split(",")
@@ -351,15 +351,12 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need a phone number. # We don't want to constantly pull down a job from s3 every time we need a phone number.
# At the same time we don't want to store it in redis or the db # At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads. # So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id) job = job_cache.get(job_id)
if job is None: if job is None:
current_app.logger.info(f"job {job_id} was not in the cache") current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id) job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors # Even if it is None, put it here to avoid KeyErrors
JOBS[job_id] = job set_job_cache(job_cache, job_id, job)
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
if job is None: if job is None:
current_app.logger.error( current_app.logger.error(
@@ -369,12 +366,12 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job # If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it # and that dictionary is not there, create it
if JOBS.get(f"{job_id}_phones") is None: if job_cache.get(f"{job_id}_phones") is None:
JOBS[f"{job_id}_phones"] = extract_phones(job) set_job_cache(job_cache, f"{job_id}_phones", extract_phones(job))
# If we can find the quick dictionary, use it # If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_phones") is not None: if job_cache.get(f"{job_id}_phones") is not None:
phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number) phone_to_return = job_cache.get(f"{job_id}_phones")[0].get(job_row_number)
if phone_to_return: if phone_to_return:
return phone_to_return return phone_to_return
else: else:
@@ -393,13 +390,10 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need the personalisation. # We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db # At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads. # So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id) job = job_cache.get(job_id)
if job is None: if job is None:
job = get_job_from_s3(service_id, job_id) job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job set_job_cache(job_cache, job_id, job)
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
# If the job is None after our attempt to retrieve it from s3, it # If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in # probably means the job is old and has been deleted from s3, in
@@ -414,12 +408,14 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job # If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it # and that dictionary is not there, create it
if JOBS.get(f"{job_id}_personalisation") is None: if job_cache.get(f"{job_id}_personalisation") is None:
JOBS[f"{job_id}_personalisation"] = extract_personalisation(job) set_job_cache(
job_cache, f"{job_id}_personalisation", extract_personalisation(job)
)
# If we can find the quick dictionary, use it # If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_personalisation") is not None: if job_cache.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get( personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get(
job_row_number job_row_number
) )
if personalisation_to_return: if personalisation_to_return:

View File

@@ -446,6 +446,11 @@ def regenerate_job_cache():
s3.get_s3_files() s3.get_s3_files()
@notify_celery.task(name="clean-job-cache")
def clean_job_cache():
s3.clean_cache()
@notify_celery.task(name="delete-old-s3-objects") @notify_celery.task(name="delete-old-s3-objects")
def delete_old_s3_objects(): def delete_old_s3_objects():
s3.cleanup_old_s3_objects() s3.cleanup_old_s3_objects()

View File

@@ -269,6 +269,11 @@ class Config(object):
"expires": 60, "expires": 60,
}, # Ensure it doesn't run if missed }, # Ensure it doesn't run if missed
}, },
"clean-job-cache": {
"task": "clean-job-cache",
"schedule": crontab(minute="*/5"),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-unfinished-jobs": { "cleanup-unfinished-jobs": {
"task": "cleanup-unfinished-jobs", "task": "cleanup-unfinished-jobs",
"schedule": crontab(hour=4, minute=5), "schedule": crontab(hour=4, minute=5),