mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 02:11:11 -05:00
fix ExpiringDict caching solution
This commit is contained in:
@@ -252,6 +252,7 @@ def register_blueprint(application):
|
||||
|
||||
|
||||
def init_app(app):
|
||||
|
||||
@app.before_request
|
||||
def record_request_details():
|
||||
g.start = monotonic()
|
||||
|
||||
@@ -19,6 +19,65 @@ JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
|
||||
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
|
||||
|
||||
|
||||
def list_s3_objects():
|
||||
print("ENTER LIST_S3_OBJECTS")
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
s3 = session.client("s3")
|
||||
|
||||
objects = []
|
||||
try:
|
||||
response = s3.list_objects_v2(Bucket=bucket_name)
|
||||
while True:
|
||||
for obj in response.get("Contents", []):
|
||||
objects.append(obj["Key"])
|
||||
if "NextContinuationToken" in response:
|
||||
response = s3.list_objects_v2(
|
||||
Bucket=bucket_name,
|
||||
ContinuationToken=response["NextContinuationToken"],
|
||||
)
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
current_app.logger.error(f"An error occurred while regenerating cache {e}")
|
||||
return objects
|
||||
|
||||
|
||||
def get_s3_files():
|
||||
print("ENTER GET_S3_FILES")
|
||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
|
||||
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
|
||||
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
|
||||
session = Session(
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_key,
|
||||
region_name=region,
|
||||
)
|
||||
objects = list_s3_objects()
|
||||
|
||||
s3res = session.resource("s3", config=AWS_CLIENT_CONFIG)
|
||||
print(f"LEN JOBS BEFORE LOAD {len(JOBS)}")
|
||||
for object in objects:
|
||||
object_arr = object.split("/")
|
||||
job_id = object_arr[1]
|
||||
job_id = job_id.replace(".csv", "")
|
||||
if JOBS.get(job_id) is None:
|
||||
object = (
|
||||
s3res.Object(bucket_name, object).get()["Body"].read().decode("utf-8")
|
||||
)
|
||||
if "phone number" in object.lower():
|
||||
JOBS[job_id] = object
|
||||
print(f"NOW LEN JOBS IS {len(JOBS)} object {object}")
|
||||
|
||||
|
||||
def get_s3_file(bucket_name, file_location, access_key, secret_key, region):
|
||||
s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region)
|
||||
return s3_file.get()["Body"].read().decode("utf-8")
|
||||
@@ -146,6 +205,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
|
||||
if job is None:
|
||||
job = get_job_from_s3(service_id, job_id)
|
||||
JOBS[job_id] = job
|
||||
print(f"CACHE MISS FOR JOB_ID {job_id}")
|
||||
incr_jobs_cache_misses()
|
||||
else:
|
||||
incr_jobs_cache_hits()
|
||||
|
||||
@@ -441,6 +441,12 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id):
|
||||
)
|
||||
|
||||
|
||||
@notify_celery.task(name="regenerate-job-cache")
|
||||
def regenerate_job_cache():
|
||||
print("ENTER REGENERATE_JOB_CACHE")
|
||||
s3.get_s3_files()
|
||||
|
||||
|
||||
@notify_celery.task(name="process-incomplete-jobs")
|
||||
def process_incomplete_jobs(job_ids):
|
||||
jobs = [dao_get_job_by_id(job_id) for job_id in job_ids]
|
||||
|
||||
@@ -249,6 +249,11 @@ class Config(object):
|
||||
"schedule": crontab(hour=6, minute=0),
|
||||
"options": {"queue": QueueNames.PERIODIC},
|
||||
},
|
||||
"regenerate-job-cache": {
|
||||
"task": "regenerate-job-cache",
|
||||
"schedule": crontab(),
|
||||
"options": {"queue": QueueNames.PERIODIC},
|
||||
},
|
||||
"cleanup-unfinished-jobs": {
|
||||
"task": "cleanup-unfinished-jobs",
|
||||
"schedule": crontab(hour=4, minute=5),
|
||||
|
||||
Reference in New Issue
Block a user