diff --git a/app/aws/s3.py b/app/aws/s3.py index 52e2a5eb1..3f2af6183 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -1,6 +1,7 @@ import datetime import re import time +from concurrent.futures import ThreadPoolExecutor import botocore from boto3 import Session @@ -115,33 +116,56 @@ def cleanup_old_s3_objects(): ) -def get_s3_files(): +def read_s3_file(bucket_name, object_key, s3res): + """ + This method runs during the 'regenerate job cache' task. + Note that in addition to retrieving the jobs and putting them + into the cache, this method also does some pre-processing by + putting a list of all phone numbers into the cache as well. + This means that when the report needs to be regenerated, it + can easily find the phone numbers in the cache through JOBS[_phones] + and the personalization through JOBS[_personalisation], which + in theory should make report generation a lot faster. + + We are moving processing from the front end where the user can see it + in wait time, to this back end process. + """ + try: + + object_arr = object_key.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + if JOBS.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object_key) + .get()["Body"] + .read() + .decode("utf-8") + ) + if "phone number" in object.lower(): + JOBS[job_id] = object + JOBS[f"{job_id}_phones"] = extract_phones(object) + JOBS[f"{job_id}_personalisation"] = extract_personalisation(object) + except LookupError: + # perhaps our key is not formatted as we expected. If so skip it. + current_app.logger.exception("LookupError #notify-admin-1200") + + +def get_s3_files(): + """ + We're using the ThreadPoolExecutor here to speed up the retrieval of S3 + csv files for scaling needs. + """ bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] - objects = list_s3_objects() + object_keys = list_s3_objects() s3res = get_s3_resource() current_app.logger.info( f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" ) - for object in objects: - # We put our csv files in the format "service-{service_id}-notify/{job_id}" - try: - object_arr = object.split("/") - job_id = object_arr[1] # get the job_id - job_id = job_id.replace(".csv", "") # we just want the job_id - if JOBS.get(job_id) is None: - object = ( - s3res.Object(bucket_name, object) - .get()["Body"] - .read() - .decode("utf-8") - ) - if "phone number" in object.lower(): - JOBS[job_id] = object - except LookupError: - # perhaps our key is not formatted as we expected. If so skip it. - current_app.logger.exception("LookupError #notify-admin-1200") + with ThreadPoolExecutor() as executor: + executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) current_app.logger.info( f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" diff --git a/app/config.py b/app/config.py index 71fa4ed23..9a4412615 100644 --- a/app/config.py +++ b/app/config.py @@ -256,7 +256,7 @@ class Config(object): }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(minute="*/30"), + "schedule": crontab(minute="*/3"), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache-on-startup": {