diff --git a/.ds.baseline b/.ds.baseline index 26b862646..1c279e018 100644 --- a/.ds.baseline +++ b/.ds.baseline @@ -209,7 +209,7 @@ "filename": "tests/app/aws/test_s3.py", "hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747", "is_verified": false, - "line_number": 28, + "line_number": 29, "is_secret": false } ], @@ -384,5 +384,5 @@ } ] }, - "generated_at": "2024-09-26T14:17:05Z" + "generated_at": "2024-09-27T16:42:53Z" } diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 14128b0f8..57863effb 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -54,7 +54,7 @@ jobs: - name: Check for dead code run: make dead-code - name: Run tests with coverage - run: poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10 + run: poetry run coverage run --omit=*/migrations/* -m pytest --maxfail=10 env: SQLALCHEMY_DATABASE_TEST_URI: postgresql://user:password@localhost:5432/test_notification_api NOTIFY_E2E_TEST_EMAIL: ${{ secrets.NOTIFY_E2E_TEST_EMAIL }} diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index e5436d01f..407150428 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -55,7 +55,7 @@ jobs: run: poetry export --without-hashes --format=requirements.txt > requirements.txt - name: Deploy to cloud.gov - uses: 18f/cg-deploy-action@main + uses: cloud-gov/cg-cli-tools@main env: DANGEROUS_SALT: ${{ secrets.DANGEROUS_SALT }} SECRET_KEY: ${{ secrets.SECRET_KEY }} diff --git a/Makefile b/Makefile index 175f630c4..a0fd86ae4 100644 --- a/Makefile +++ b/Makefile @@ -81,7 +81,7 @@ test: ## Run tests and create coverage report poetry run black . poetry run flake8 . poetry run isort --check-only ./app ./tests - poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10 + poetry run coverage run --omit=*/migrations/* -m pytest --maxfail=10 poetry run coverage report -m --fail-under=95 poetry run coverage html -d .coverage_cache diff --git a/app/aws/s3.py b/app/aws/s3.py index bbb06e602..4272b2887 100644 --- a/app/aws/s3.py +++ b/app/aws/s3.py @@ -2,13 +2,12 @@ import datetime import re import time from concurrent.futures import ThreadPoolExecutor +from multiprocessing import Manager import botocore from boto3 import Session -from expiringdict import ExpiringDict from flask import current_app -from app import redis_store from app.clients import AWS_CLIENT_CONFIG from notifications_utils import aware_utcnow @@ -16,17 +15,30 @@ FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv" # Temporarily extend cache to 7 days ttl = 60 * 60 * 24 * 7 -JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl) +manager = Manager() +job_cache = manager.dict() -JOBS_CACHE_HITS = "JOBS_CACHE_HITS" -JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES" - # Global variable s3_client = None s3_resource = None +def set_job_cache(job_cache, key, value): + job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60) + + +def clean_cache(): + current_time = time.time() + keys_to_delete = [] + for key, (_, expiry_time) in job_cache.items(): + if expiry_time < current_time: + keys_to_delete.append(key) + + for key in keys_to_delete: + del job_cache[key] + + def get_s3_client(): global s3_client if s3_client is None: @@ -88,7 +100,6 @@ def get_bucket_name(): def cleanup_old_s3_objects(): - bucket_name = get_bucket_name() s3_client = get_s3_client() @@ -100,9 +111,15 @@ def cleanup_old_s3_objects(): while True: for obj in response.get("Contents", []): if obj["LastModified"] <= time_limit: - current_app.logger.info( - f"#delete-old-s3-objects Wanting to delete: {obj['LastModified']} {obj['Key']}" - ) + + try: + remove_csv_object(obj["Key"]) + current_app.logger.info( + f"#delete-old-s3-objects Deleted: {obj['LastModified']} {obj['Key']}" + ) + except botocore.exceptions.ClientError: + current_app.logger.exception(f"Couldn't delete {obj['Key']}") + if "NextContinuationToken" in response: response = s3_client.list_objects_v2( Bucket=bucket_name, @@ -131,8 +148,8 @@ def read_s3_file(bucket_name, object_key, s3res): putting a list of all phone numbers into the cache as well. This means that when the report needs to be regenerated, it - can easily find the phone numbers in the cache through JOBS[_phones] - and the personalization through JOBS[_personalisation], which + can easily find the phone numbers in the cache through job_cache[_phones] + and the personalization through job_cache[_personalisation], which in theory should make report generation a lot faster. We are moving processing from the front end where the user can see it @@ -140,7 +157,7 @@ def read_s3_file(bucket_name, object_key, s3res): """ try: job_id = get_job_id_from_s3_object_key(object_key) - if JOBS.get(job_id) is None: + if job_cache.get(job_id) is None: object = ( s3res.Object(bucket_name, object_key) .get()["Body"] @@ -148,9 +165,13 @@ def read_s3_file(bucket_name, object_key, s3res): .decode("utf-8") ) if "phone number" in object.lower(): - JOBS[job_id] = object - JOBS[f"{job_id}_phones"] = extract_phones(object) - JOBS[f"{job_id}_personalisation"] = extract_personalisation(object) + set_job_cache(job_cache, job_id, object) + set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object)) + set_job_cache( + job_cache, + f"{job_id}_personalisation", + extract_personalisation(object), + ) except LookupError: # perhaps our key is not formatted as we expected. If so skip it. @@ -167,13 +188,13 @@ def get_s3_files(): s3res = get_s3_resource() current_app.logger.info( - f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200" + f"job_cache length before regen: {len(job_cache)} #notify-admin-1200" ) with ThreadPoolExecutor() as executor: executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys) current_app.logger.info( - f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200" + f"job_cache length after regen: {len(job_cache)} #notify-admin-1200" ) @@ -311,20 +332,6 @@ def get_job_from_s3(service_id, job_id): return None -def incr_jobs_cache_misses(): - if not redis_store.get(JOBS_CACHE_MISSES): - redis_store.set(JOBS_CACHE_MISSES, 1) - else: - redis_store.incr(JOBS_CACHE_MISSES) - - -def incr_jobs_cache_hits(): - if not redis_store.get(JOBS_CACHE_HITS): - redis_store.set(JOBS_CACHE_HITS, 1) - else: - redis_store.incr(JOBS_CACHE_HITS) - - def extract_phones(job): job = job.split("\r\n") first_row = job[0] @@ -333,7 +340,7 @@ def extract_phones(job): phone_index = 0 for item in first_row: # Note: may contain a BOM and look like \ufeffphone number - if "phone number" in item.lower(): + if item.lower() in ["phone number", "\\ufeffphone number"]: break phone_index = phone_index + 1 @@ -357,7 +364,8 @@ def extract_phones(job): def extract_personalisation(job): - job = job.split("\r\n") + + job = job[0].split("\r\n") first_row = job[0] job.pop(0) first_row = first_row.split(",") @@ -372,18 +380,15 @@ def extract_personalisation(job): def get_phone_number_from_s3(service_id, job_id, job_row_number): - # We don't want to constantly pull down a job from s3 every time we need a phone number. - # At the same time we don't want to store it in redis or the db - # So this is a little recycling mechanism to reduce the number of downloads. - job = JOBS.get(job_id) + job = job_cache.get(job_id) if job is None: current_app.logger.info(f"job {job_id} was not in the cache") job = get_job_from_s3(service_id, job_id) # Even if it is None, put it here to avoid KeyErrors - JOBS[job_id] = job - incr_jobs_cache_misses() + set_job_cache(job_cache, job_id, job) else: - incr_jobs_cache_hits() + # skip expiration date from cache, we don't need it here + job = job[0] if job is None: current_app.logger.error( @@ -391,24 +396,19 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number): ) return "Unavailable" - # If we look in the JOBS cache for the quick lookup dictionary of phones for a given job + # If we look in the job_cache for the quick lookup dictionary of phones for a given job # and that dictionary is not there, create it - if JOBS.get(f"{job_id}_phones") is None: - JOBS[f"{job_id}_phones"] = extract_phones(job) + if job_cache.get(f"{job_id}_phones") is None: + phones = extract_phones(job) + set_job_cache(job_cache, f"{job_id}_phones", phones) # If we can find the quick dictionary, use it - if JOBS.get(f"{job_id}_phones") is not None: - phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number) - if phone_to_return: - return phone_to_return - else: - current_app.logger.warning( - f"Was unable to retrieve phone number from lookup dictionary for job {job_id}" - ) - return "Unavailable" + phone_to_return = phones[job_row_number] + if phone_to_return: + return phone_to_return else: - current_app.logger.error( - f"Was unable to construct lookup dictionary for job {job_id}" + current_app.logger.warning( + f"Was unable to retrieve phone number from lookup dictionary for job {job_id}" ) return "Unavailable" @@ -417,13 +417,10 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): # We don't want to constantly pull down a job from s3 every time we need the personalisation. # At the same time we don't want to store it in redis or the db # So this is a little recycling mechanism to reduce the number of downloads. - job = JOBS.get(job_id) + job = job_cache.get(job_id) if job is None: job = get_job_from_s3(service_id, job_id) - JOBS[job_id] = job - incr_jobs_cache_misses() - else: - incr_jobs_cache_hits() + set_job_cache(job_cache, job_id, job) # If the job is None after our attempt to retrieve it from s3, it # probably means the job is old and has been deleted from s3, in @@ -436,14 +433,16 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number): ) return {} - # If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job + # If we look in the job_cache for the quick lookup dictionary of personalisations for a given job # and that dictionary is not there, create it - if JOBS.get(f"{job_id}_personalisation") is None: - JOBS[f"{job_id}_personalisation"] = extract_personalisation(job) + if job_cache.get(f"{job_id}_personalisation") is None: + set_job_cache( + job_cache, f"{job_id}_personalisation", extract_personalisation(job) + ) # If we can find the quick dictionary, use it - if JOBS.get(f"{job_id}_personalisation") is not None: - personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get( + if job_cache.get(f"{job_id}_personalisation") is not None: + personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get( job_row_number ) if personalisation_to_return: diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 87df0ca83..c8ad8cc6d 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -446,6 +446,11 @@ def regenerate_job_cache(): s3.get_s3_files() +@notify_celery.task(name="clean-job-cache") +def clean_job_cache(): + s3.clean_cache() + + @notify_celery.task(name="delete-old-s3-objects") def delete_old_s3_objects(): s3.cleanup_old_s3_objects() diff --git a/app/config.py b/app/config.py index 9a4412615..4a8c880d3 100644 --- a/app/config.py +++ b/app/config.py @@ -251,12 +251,12 @@ class Config(object): }, "delete_old_s3_objects": { "task": "delete-old-s3-objects", - "schedule": crontab(minute="*/5"), + "schedule": crontab(hour=7, minute=10), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache": { "task": "regenerate-job-cache", - "schedule": crontab(minute="*/3"), + "schedule": crontab(minute="*/30"), "options": {"queue": QueueNames.PERIODIC}, }, "regenerate-job-cache-on-startup": { @@ -269,6 +269,11 @@ class Config(object): "expires": 60, }, # Ensure it doesn't run if missed }, + "clean-job-cache": { + "task": "clean-job-cache", + "schedule": crontab(hour=2, minute=11), + "options": {"queue": QueueNames.PERIODIC}, + }, "cleanup-unfinished-jobs": { "task": "cleanup-unfinished-jobs", "schedule": crontab(hour=4, minute=5), diff --git a/tests/app/aws/test_s3.py b/tests/app/aws/test_s3.py index 17222d2f0..8e3863d5c 100644 --- a/tests/app/aws/test_s3.py +++ b/tests/app/aws/test_s3.py @@ -1,4 +1,5 @@ import os +from datetime import timedelta from os import getenv import pytest @@ -32,15 +33,30 @@ def single_s3_object_stub(key="foo", last_modified=None): def test_cleanup_old_s3_objects(mocker): + """ + Currently we are going to delete s3 objects if they are more than 14 days old, + because we want to delete all jobs older than 7 days, and jobs can be scheduled + three days in advance, and on top of that we want to leave a little cushion for + the time being. This test shows that a 3 day old job ("B") is not deleted, + whereas a 30 day old job ("A") is. + """ mocker.patch("app.aws.s3.get_bucket_name", return_value="Bucket") + mock_s3_client = mocker.Mock() mocker.patch("app.aws.s3.get_s3_client", return_value=mock_s3_client) + mock_remove_csv_object = mocker.patch("app.aws.s3.remove_csv_object") + lastmod30 = aware_utcnow() - timedelta(days=30) + lastmod3 = aware_utcnow() - timedelta(days=3) mock_s3_client.list_objects_v2.return_value = { - "Contents": [{"Key": "A", "LastModified": aware_utcnow()}] + "Contents": [ + {"Key": "A", "LastModified": lastmod30}, + {"Key": "B", "LastModified": lastmod3}, + ] } cleanup_old_s3_objects() mock_s3_client.list_objects_v2.assert_called_with(Bucket="Bucket") + mock_remove_csv_object.assert_called_once_with("A") def test_get_s3_file_makes_correct_call(notify_api, mocker): @@ -96,7 +112,6 @@ def test_get_s3_file_makes_correct_call(notify_api, mocker): def test_get_phone_number_from_s3( mocker, job, job_id, job_row_number, expected_phone_number ): - mocker.patch("app.aws.s3.redis_store") get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3") get_job_mock.return_value = job phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number) @@ -176,7 +191,6 @@ def test_get_job_from_s3_exponential_backoff_file_not_found(mocker): def test_get_personalisation_from_s3( mocker, job, job_id, job_row_number, expected_personalisation ): - mocker.patch("app.aws.s3.redis_store") get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3") get_job_mock.return_value = job personalisation = get_personalisation_from_s3("service_id", job_id, job_row_number)