diff --git a/app/__init__.py b/app/__init__.py index c08c4ae0a..5d10966e8 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -252,6 +252,7 @@ def register_blueprint(application): def init_app(app): + @app.before_request def record_request_details(): g.start = monotonic() diff --git a/app/aws/s3.py b/app/aws/s3.py index 9466e6cce..ebdffddd5 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -19,6 +19,77 @@ JOBS_CACHE_HITS = "JOBS_CACHE_HITS" JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES" +def list_s3_objects(): + bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] + secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] + region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + session = Session( + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + ) + s3 = session.client("s3") + + try: + response = s3.list_objects_v2(Bucket=bucket_name) + while True: + for obj in response.get("Contents", []): + yield obj["Key"] + if "NextContinuationToken" in response: + response = s3.list_objects_v2( + Bucket=bucket_name, + ContinuationToken=response["NextContinuationToken"], + ) + else: + break + except Exception as e: + current_app.logger.error( + f"An error occurred while regenerating cache #notify-admin-1200 {e}" + ) + + +def get_s3_files(): + current_app.logger.info("Regenerate job cache #notify-admin-1200") + bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"] + access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"] + secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"] + region = current_app.config["CSV_UPLOAD_BUCKET"]["region"] + session = Session( + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + ) + objects = list_s3_objects() + + s3res = session.resource("s3", config=AWS_CLIENT_CONFIG) + current_app.logger.info( + f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" + ) + for object in objects: + # We put our csv files in the format "service-{service_id}-notify/{job_id}" + try: + object_arr = object.split("/") + job_id = object_arr[1] # get the job_id + job_id = job_id.replace(".csv", "") # we just want the job_id + if JOBS.get(job_id) is None: + object = ( + s3res.Object(bucket_name, object) + .get()["Body"] + .read() + .decode("utf-8") + ) + if "phone number" in object.lower(): + JOBS[job_id] = object + except LookupError as le: + # perhaps our key is not formatted as we expected. If so skip it. + current_app.logger.error(f"LookupError {le} #notify-admin-1200") + + current_app.logger.info( + f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" + ) + + def get_s3_file(bucket_name, file_location, access_key, secret_key, region): s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region) return s3_file.get()["Body"].read().decode("utf-8") diff --git a/app/celery/tasks.py b/app/celery/tasks.py index f0d036549..e6ed717e7 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -441,6 +441,11 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): ) +@notify_celery.task(name="regenerate-job-cache") +def regenerate_job_cache(): + s3.get_s3_files() + + @notify_celery.task(name="process-incomplete-jobs") def process_incomplete_jobs(job_ids): jobs = [dao_get_job_by_id(job_id) for job_id in job_ids] diff --git a/app/config.py b/app/config.py index 8d913bdd8..65ef6b2d3 100644 --- a/app/config.py +++ b/app/config.py @@ -249,6 +249,11 @@ class Config(object): "schedule": crontab(hour=6, minute=0), "options": {"queue": QueueNames.PERIODIC}, }, + "regenerate-job-cache": { + "task": "regenerate-job-cache", + "schedule": crontab(minute="*/30"), + "options": {"queue": QueueNames.PERIODIC}, + }, "cleanup-unfinished-jobs": { "task": "cleanup-unfinished-jobs", "schedule": crontab(hour=4, minute=5),