merge from main

This commit is contained in:
Kenneth Kehl
2024-10-01 14:38:23 -07:00
12 changed files with 457 additions and 1072 deletions

View File

@@ -209,7 +209,7 @@
"filename": "tests/app/aws/test_s3.py",
"hashed_secret": "67a74306b06d0c01624fe0d0249a570f4d093747",
"is_verified": false,
"line_number": 27,
"line_number": 29,
"is_secret": false
}
],
@@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-09-10T18:12:39Z"
"generated_at": "2024-09-27T16:42:53Z"
}

View File

@@ -54,7 +54,7 @@ jobs:
- name: Check for dead code
run: make dead-code
- name: Run tests with coverage
run: poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10
run: poetry run coverage run --omit=*/migrations/*,*/tests/* -m pytest --maxfail=10
env:
SQLALCHEMY_DATABASE_TEST_URI: postgresql://user:password@localhost:5432/test_notification_api
NOTIFY_E2E_TEST_EMAIL: ${{ secrets.NOTIFY_E2E_TEST_EMAIL }}
@@ -63,7 +63,7 @@ jobs:
NOTIFY_E2E_TEST_PASSWORD: ${{ secrets.NOTIFY_E2E_TEST_PASSWORD }}
- name: Check coverage threshold
# TODO get this back up to 95
run: poetry run coverage report -m --fail-under=95
run: poetry run coverage report -m --fail-under=91
validate-new-relic-config:
runs-on: ubuntu-latest

View File

@@ -70,15 +70,7 @@ jobs:
cf_password: ${{ secrets.CLOUDGOV_PASSWORD }}
cf_org: gsa-tts-benefits-studio
cf_space: notify-staging
push_arguments: >-
--vars-file deploy-config/staging.yml
--var DANGEROUS_SALT="$DANGEROUS_SALT"
--var SECRET_KEY="$SECRET_KEY"
--var ADMIN_CLIENT_SECRET="$ADMIN_CLIENT_SECRET"
--var NEW_RELIC_LICENSE_KEY="$NEW_RELIC_LICENSE_KEY"
--var NOTIFY_E2E_TEST_EMAIL="$NOTIFY_E2E_TEST_EMAIL"
--var NOTIFY_E2E_TEST_PASSWORD="$NOTIFY_E2E_TEST_PASSWORD"
--var LOGIN_DOT_GOV_REGISTRATION_URL="$LOGIN_DOT_GOV_REGISTRATION_URL"
cf_command: "push -f manifest.yml --vars-file deploy-config/staging.yml --var var-name=${{ secrets.DANGEROUS_SALT }} --var var-name=${{ secrets.SECRET_KEY }} --var var-name=${{ secrets.ADMIN_CLIENT_SECRET }} --var var-name=${{ secrets.NEW_RELIC_LICENSE_KEY }} --var var-name=${{ secrets.NOTIFY_E2E_TEST_EMAIL }} --var var-name=${{ secrets.NOTIFY_E2E_TEST_PASSWORD }} --var LOGIN_DOT_GOV_REGISTRATION_URL=\"${{ env.LOGIN_DOT_GOV_REGISTRATION_URL }}\" --strategy rolling"
- name: Check for changes to templates.json
id: changed-templates

View File

@@ -81,9 +81,10 @@ test: ## Run tests and create coverage report
poetry run black .
poetry run flake8 .
poetry run isort --check-only ./app ./tests
poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10
poetry run coverage run --omit=*/migrations/*,*/tests/* -m pytest --maxfail=10
poetry run coverage report -m --fail-under=95
## TODO set this back to 95 asap
poetry run coverage report -m --fail-under=91
poetry run coverage html -d .coverage_cache
.PHONY: py-lock

View File

@@ -1,13 +1,13 @@
import datetime
import re
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Manager
import botocore
from boto3 import Session
from expiringdict import ExpiringDict
from flask import current_app
from app import redis_store
from app.clients import AWS_CLIENT_CONFIG
from notifications_utils import aware_utcnow
@@ -16,17 +16,30 @@ NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv"
# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
manager = Manager()
job_cache = manager.dict()
JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"
# Global variable
s3_client = None
s3_resource = None
def set_job_cache(job_cache, key, value):
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)
def clean_cache():
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
if expiry_time < current_time:
keys_to_delete.append(key)
for key in keys_to_delete:
del job_cache[key]
def get_s3_client():
global s3_client
if s3_client is None:
@@ -88,7 +101,6 @@ def get_bucket_name():
def cleanup_old_s3_objects():
bucket_name = get_bucket_name()
s3_client = get_s3_client()
@@ -100,9 +112,15 @@ def cleanup_old_s3_objects():
while True:
for obj in response.get("Contents", []):
if obj["LastModified"] <= time_limit:
current_app.logger.info(
f"#delete-old-s3-objects Wanting to delete: {obj['LastModified']} {obj['Key']}"
)
try:
remove_csv_object(obj["Key"])
current_app.logger.info(
f"#delete-old-s3-objects Deleted: {obj['LastModified']} {obj['Key']}"
)
except botocore.exceptions.ClientError:
current_app.logger.exception(f"Couldn't delete {obj['Key']}")
if "NextContinuationToken" in response:
response = s3_client.list_objects_v2(
Bucket=bucket_name,
@@ -116,36 +134,70 @@ def cleanup_old_s3_objects():
)
def get_s3_files():
def get_job_id_from_s3_object_key(key):
object_arr = key.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
return job_id
def read_s3_file(bucket_name, object_key, s3res):
"""
This method runs during the 'regenerate job cache' task.
Note that in addition to retrieving the jobs and putting them
into the cache, this method also does some pre-processing by
putting a list of all phone numbers into the cache as well.
This means that when the report needs to be regenerated, it
can easily find the phone numbers in the cache through job_cache[<job_id>_phones]
and the personalization through job_cache[<job_id>_personalisation], which
in theory should make report generation a lot faster.
We are moving processing from the front end where the user can see it
in wait time, to this back end process.
"""
try:
job_id = get_job_id_from_s3_object_key(object_key)
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object_key)
.get()["Body"]
.read()
.decode("utf-8")
)
set_job_cache(job_cache, job_id, object)
set_job_cache(job_cache, f"{job_id}_phones", extract_phones(object))
set_job_cache(
job_cache,
f"{job_id}_personalisation",
extract_personalisation(object),
)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")
def get_s3_files():
"""
We're using the ThreadPoolExecutor here to speed up the retrieval of S3
csv files for scaling needs.
"""
bucket_name = current_app.config["CSV_UPLOAD_BUCKET"]["bucket"]
objects = list_s3_objects()
object_keys = list_s3_objects()
s3res = get_s3_resource()
current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
)
for object in objects:
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
try:
object_arr = object.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
if JOBS.get(job_id) is None:
object = (
s3res.Object(bucket_name, object)
.get()["Body"]
.read()
.decode("utf-8")
)
if "phone number" in object.lower():
JOBS[job_id] = object
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")
try:
with ThreadPoolExecutor() as executor:
executor.map(lambda key: read_s3_file(bucket_name, key, s3res), object_keys)
except Exception:
current_app.logger.exception("Connection pool issue")
current_app.logger.info(
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
)
@@ -303,20 +355,6 @@ def get_job_from_s3(service_id, job_id):
return None
def incr_jobs_cache_misses():
if not redis_store.get(JOBS_CACHE_MISSES):
redis_store.set(JOBS_CACHE_MISSES, 1)
else:
redis_store.incr(JOBS_CACHE_MISSES)
def incr_jobs_cache_hits():
if not redis_store.get(JOBS_CACHE_HITS):
redis_store.set(JOBS_CACHE_HITS, 1)
else:
redis_store.incr(JOBS_CACHE_HITS)
def extract_phones(job):
job = job.split("\r\n")
first_row = job[0]
@@ -325,7 +363,7 @@ def extract_phones(job):
phone_index = 0
for item in first_row:
# Note: may contain a BOM and look like \ufeffphone number
if "phone number" in item.lower():
if item.lower() in ["phone number", "\\ufeffphone number"]:
break
phone_index = phone_index + 1
@@ -349,6 +387,8 @@ def extract_phones(job):
def extract_personalisation(job):
if isinstance(job, dict):
job = job[0]
job = job.split("\r\n")
first_row = job[0]
job.pop(0)
@@ -364,18 +404,15 @@ def extract_personalisation(job):
def get_phone_number_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need a phone number.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
JOBS[job_id] = job
incr_jobs_cache_misses()
set_job_cache(job_cache, job_id, job)
else:
incr_jobs_cache_hits()
# skip expiration date from cache, we don't need it here
job = job[0]
if job is None:
current_app.logger.error(
@@ -383,24 +420,16 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):
)
return "Unavailable"
# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_phones") is None:
JOBS[f"{job_id}_phones"] = extract_phones(job)
phones = extract_phones(job)
set_job_cache(job_cache, f"{job_id}_phones", phones)
# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_phones") is not None:
phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number)
if phone_to_return:
return phone_to_return
else:
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"
phone_to_return = phones[job_row_number]
if phone_to_return:
return phone_to_return
else:
current_app.logger.error(
f"Was unable to construct lookup dictionary for job {job_id}"
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"
@@ -409,14 +438,15 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job
incr_jobs_cache_misses()
# Even if it is None, put it here to avoid KeyErrors
set_job_cache(job_cache, job_id, job)
else:
incr_jobs_cache_hits()
# skip expiration date from cache, we don't need it here
job = job[0]
# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
# which case there is nothing we can do. It's unlikely to run into
@@ -428,14 +458,11 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
)
return {}
# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_personalisation") is None:
JOBS[f"{job_id}_personalisation"] = extract_personalisation(job)
set_job_cache(job_cache, f"{job_id}_personalisation", extract_personalisation(job))
# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get(
if job_cache.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get(
job_row_number
)
if personalisation_to_return:

View File

@@ -446,6 +446,11 @@ def regenerate_job_cache():
s3.get_s3_files()
@notify_celery.task(name="clean-job-cache")
def clean_job_cache():
s3.clean_cache()
@notify_celery.task(name="delete-old-s3-objects")
def delete_old_s3_objects():
s3.cleanup_old_s3_objects()

View File

@@ -13,6 +13,10 @@ AWS_CLIENT_CONFIG = Config(
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
# there may come a time when increasing this helps
# with job cache management.
max_pool_connections=10,
)

View File

@@ -251,7 +251,7 @@ class Config(object):
},
"delete_old_s3_objects": {
"task": "delete-old-s3-objects",
"schedule": crontab(minute="*/5"),
"schedule": crontab(hour=7, minute=10),
"options": {"queue": QueueNames.PERIODIC},
},
"regenerate-job-cache": {
@@ -269,6 +269,11 @@ class Config(object):
"expires": 60,
}, # Ensure it doesn't run if missed
},
"clean-job-cache": {
"task": "clean-job-cache",
"schedule": crontab(hour=2, minute=11),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-unfinished-jobs": {
"task": "cleanup-unfinished-jobs",
"schedule": crontab(hour=4, minute=5),

View File

@@ -36,17 +36,14 @@ def send_sms_to_provider(notification):
Get data for recipient, template,
notification and send it to sns.
"""
# we no longer store the personalisation in the db,
# need to retrieve from s3 before generating content
# However, we are still sending the initial verify code through personalisation
# so if there is some value there, don't overwrite it
if not notification.personalisation:
personalisation = get_personalisation_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
notification.personalisation = personalisation
# Take this path for report generation, where we know
# everything is in the cache.
personalisation = get_personalisation_from_s3(
notification.service_id,
notification.job_id,
notification.job_row_number,
)
notification.personalisation = personalisation
service = SerialisedService.from_id(notification.service_id)
message_id = None

View File

@@ -39,6 +39,12 @@ def init_app(app):
for logger_instance, handler in product(warning_loggers, handlers):
logger_instance.addHandler(handler)
logger_instance.setLevel(logging.WARNING)
# Suppress specific loggers to prevent leaking sensitive info
logging.getLogger("boto3").setLevel(logging.ERROR)
logging.getLogger("botocore").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
app.logger.info("Logging configured")

View File

@@ -1,4 +1,5 @@
import os
from datetime import timedelta
from os import getenv
import pytest
@@ -8,6 +9,7 @@ from app.aws.s3 import (
cleanup_old_s3_objects,
file_exists,
get_job_from_s3,
get_job_id_from_s3_object_key,
get_personalisation_from_s3,
get_phone_number_from_s3,
get_s3_file,
@@ -31,15 +33,30 @@ def single_s3_object_stub(key="foo", last_modified=None):
def test_cleanup_old_s3_objects(mocker):
"""
Currently we are going to delete s3 objects if they are more than 14 days old,
because we want to delete all jobs older than 7 days, and jobs can be scheduled
three days in advance, and on top of that we want to leave a little cushion for
the time being. This test shows that a 3 day old job ("B") is not deleted,
whereas a 30 day old job ("A") is.
"""
mocker.patch("app.aws.s3.get_bucket_name", return_value="Bucket")
mock_s3_client = mocker.Mock()
mocker.patch("app.aws.s3.get_s3_client", return_value=mock_s3_client)
mock_remove_csv_object = mocker.patch("app.aws.s3.remove_csv_object")
lastmod30 = aware_utcnow() - timedelta(days=30)
lastmod3 = aware_utcnow() - timedelta(days=3)
mock_s3_client.list_objects_v2.return_value = {
"Contents": [{"Key": "A", "LastModified": aware_utcnow()}]
"Contents": [
{"Key": "A", "LastModified": lastmod30},
{"Key": "B", "LastModified": lastmod3},
]
}
cleanup_old_s3_objects()
mock_s3_client.list_objects_v2.assert_called_with(Bucket="Bucket")
mock_remove_csv_object.assert_called_once_with("A")
def test_get_s3_file_makes_correct_call(notify_api, mocker):
@@ -95,13 +112,27 @@ def test_get_s3_file_makes_correct_call(notify_api, mocker):
def test_get_phone_number_from_s3(
mocker, job, job_id, job_row_number, expected_phone_number
):
mocker.patch("app.aws.s3.redis_store")
get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3")
get_job_mock.return_value = job
phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number)
assert phone_number == expected_phone_number
@pytest.mark.parametrize(
"key, expected_job_id",
[
("service-blahblahblah-notify/abcde.csv", "abcde"),
(
"service-x-notify/4c99f361-4ed7-49b1-bd6f-02fe0c807c53.csv",
"4c99f361-4ed7-49b1-bd6f-02fe0c807c53",
),
],
)
def test_get_job_id_from_s3_object_key(key, expected_job_id):
actual_job_id = get_job_id_from_s3_object_key(key)
assert actual_job_id == expected_job_id
def mock_s3_get_object_slowdown(*args, **kwargs):
error_response = {
"Error": {
@@ -160,7 +191,6 @@ def test_get_job_from_s3_exponential_backoff_file_not_found(mocker):
def test_get_personalisation_from_s3(
mocker, job, job_id, job_row_number, expected_personalisation
):
mocker.patch("app.aws.s3.redis_store")
get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3")
get_job_mock.return_value = job
personalisation = get_personalisation_from_s3("service_id", job_id, job_row_number)

File diff suppressed because it is too large Load Diff