mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-14 01:02:09 -05:00
improve report performance
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
import datetime
|
import datetime
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
import botocore
|
import botocore
|
||||||
from boto3 import Session
|
from boto3 import Session
|
||||||
@@ -115,34 +116,57 @@ def cleanup_old_s3_objects():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_s3_files():
|
def read_s3_file(bucket_name, object_key, s3res):
|
||||||
|
"""
|
||||||
|
This method runs during the 'regenerate job cache' task.
|
||||||
|
Note that in addition to retrieving the jobs and putting them
|
||||||
|
into the cache, this method also does some pre-processing by
|
||||||
|
putting a list of all phone numbers into the cache as well.
|
||||||
|
|
||||||
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
This means that when the report needs to be regenerated, it
|
||||||
objects = list_s3_objects()
|
can easily find the phone numbers in the cache through JOBS[<job_id>_phones]
|
||||||
|
and the personalization through JOBS[<job_id>_personalisation], which
|
||||||
|
in theory should make report generation a lot faster.
|
||||||
|
|
||||||
s3res = get_s3_resource()
|
We are moving processing from the front end where the user can see it
|
||||||
current_app.logger.info(
|
in wait time, to this back end process.
|
||||||
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
|
"""
|
||||||
)
|
|
||||||
for object in objects:
|
|
||||||
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
|
|
||||||
try:
|
try:
|
||||||
object_arr = object.split("/")
|
|
||||||
|
object_arr = object_key.split("/")
|
||||||
job_id = object_arr[1] # get the job_id
|
job_id = object_arr[1] # get the job_id
|
||||||
job_id = job_id.replace(".csv", "") # we just want the job_id
|
job_id = job_id.replace(".csv", "") # we just want the job_id
|
||||||
if JOBS.get(job_id) is None:
|
if JOBS.get(job_id) is None:
|
||||||
object = (
|
object = (
|
||||||
s3res.Object(bucket_name, object)
|
s3res.Object(bucket_name, object_key)
|
||||||
.get()["Body"]
|
.get()["Body"]
|
||||||
.read()
|
.read()
|
||||||
.decode("utf-8")
|
.decode("utf-8")
|
||||||
)
|
)
|
||||||
if "phone number" in object.lower():
|
if "phone number" in object.lower():
|
||||||
JOBS[job_id] = object
|
JOBS[job_id] = object
|
||||||
|
JOBS[f"{job_id}_phones"] = extract_phones(object)
|
||||||
|
JOBS[f"{job_id}_personalisation"] = extract_personalisation(object)
|
||||||
except LookupError:
|
except LookupError:
|
||||||
# perhaps our key is not formatted as we expected. If so skip it.
|
# perhaps our key is not formatted as we expected. If so skip it.
|
||||||
current_app.logger.exception("LookupError #notify-admin-1200")
|
current_app.logger.exception("LookupError #notify-admin-1200")
|
||||||
|
|
||||||
|
|
||||||
|
def get_s3_files():
|
||||||
|
"""
|
||||||
|
We're using the ThreadPoolExecutor here to speed up the retrieval of S3
|
||||||
|
csv files for scaling needs.
|
||||||
|
"""
|
||||||
|
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
|
||||||
|
object_keys = list_s3_objects()
|
||||||
|
|
||||||
|
s3res = get_s3_resource()
|
||||||
|
current_app.logger.info(
|
||||||
|
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
|
||||||
|
)
|
||||||
|
with ThreadPoolExecutor() as executor:
|
||||||
|
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
|
||||||
|
|
||||||
current_app.logger.info(
|
current_app.logger.info(
|
||||||
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
|
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -256,7 +256,7 @@ class Config(object):
|
|||||||
},
|
},
|
||||||
"regenerate-job-cache": {
|
"regenerate-job-cache": {
|
||||||
"task": "regenerate-job-cache",
|
"task": "regenerate-job-cache",
|
||||||
"schedule": crontab(minute="*/30"),
|
"schedule": crontab(minute="*/3"),
|
||||||
"options": {"queue": QueueNames.PERIODIC},
|
"options": {"queue": QueueNames.PERIODIC},
|
||||||
},
|
},
|
||||||
"regenerate-job-cache-on-startup": {
|
"regenerate-job-cache-on-startup": {
|
||||||
|
|||||||
Reference in New Issue
Block a user