Merge branch 'main' into jim/091422/deliverycallbacks

This commit is contained in:
Jim Moffet
2022-09-30 11:21:46 -04:00
committed by GitHub
44 changed files with 917 additions and 88 deletions

View File

@@ -1,30 +1,30 @@
import os
import botocore
from boto3 import client, resource
from boto3 import Session, client
from flask import current_app
FILE_LOCATION_STRUCTURE = 'service-{}-notify/{}.csv'
default_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
default_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
default_region = os.environ.get('AWS_REGION')
def get_s3_file(bucket_name, file_location):
s3_file = get_s3_object(bucket_name, file_location)
def get_s3_file(bucket_name, file_location, access_key=default_access_key, secret_key=default_secret_key, region=default_region):
s3_file = get_s3_object(bucket_name, file_location, access_key, secret_key, region)
return s3_file.get()['Body'].read().decode('utf-8')
def get_s3_object(bucket_name, file_location):
s3 = resource('s3')
def get_s3_object(bucket_name, file_location, access_key=default_access_key, secret_key=default_secret_key, region=default_region):
session = Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
s3 = session.resource('s3')
return s3.Object(bucket_name, file_location)
def head_s3_object(bucket_name, file_location):
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.head_object
boto_client = client('s3', current_app.config['AWS_REGION'])
return boto_client.head_object(Bucket=bucket_name, Key=file_location)
def file_exists(bucket_name, file_location):
def file_exists(bucket_name, file_location, access_key=default_access_key, secret_key=default_secret_key, region=default_region):
try:
# try and access metadata of object
get_s3_object(bucket_name, file_location).metadata
get_s3_object(bucket_name, file_location, access_key, secret_key, region).metadata
return True
except botocore.exceptions.ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
@@ -36,6 +36,9 @@ def get_job_location(service_id, job_id):
return (
current_app.config['CSV_UPLOAD_BUCKET_NAME'],
FILE_LOCATION_STRUCTURE.format(service_id, job_id),
current_app.config['CSV_UPLOAD_ACCESS_KEY'],
current_app.config['CSV_UPLOAD_SECRET_KEY'],
current_app.config['CSV_UPLOAD_REGION'],
)
@@ -43,6 +46,9 @@ def get_contact_list_location(service_id, contact_list_id):
return (
current_app.config['CONTACT_LIST_BUCKET_NAME'],
FILE_LOCATION_STRUCTURE.format(service_id, contact_list_id),
current_app.config['CONTACT_LIST_ACCESS_KEY'],
current_app.config['CONTACT_LIST_SECRET_KEY'],
current_app.config['CONTACT_LIST_REGION'],
)
@@ -69,13 +75,21 @@ def remove_contact_list_from_s3(service_id, contact_list_id):
return remove_s3_object(*get_contact_list_location(service_id, contact_list_id))
def remove_s3_object(bucket_name, object_key):
obj = get_s3_object(bucket_name, object_key)
def remove_s3_object(bucket_name, object_key, access_key, secret_key, region):
obj = get_s3_object(bucket_name, object_key, access_key, secret_key, region)
return obj.delete()
def get_list_of_files_by_suffix(bucket_name, subfolder='', suffix='', last_modified=None):
s3_client = client('s3', current_app.config['AWS_REGION'])
def get_list_of_files_by_suffix(
bucket_name,
subfolder='',
suffix='',
last_modified=None,
access_key=default_access_key,
secret_key=default_secret_key,
region=default_region
):
s3_client = client('s3', region, aws_access_key_id=access_key, aws_secret_access_key=secret_key)
paginator = s3_client.get_paginator('list_objects_v2')
page_iterator = paginator.paginate(

View File

@@ -449,6 +449,7 @@ def handle_exception(task, notification, notification_id, exc):
# Sometimes, SQS plays the same message twice. We should be able to catch an IntegrityError, but it seems
# SQLAlchemy is throwing a FlushError. So we check if the notification id already exists then do not
# send to the retry queue.
# This probably (hopefully) is not an issue with Redis as the celery backing store
current_app.logger.exception('Retry' + retry_msg)
try:
task.retry(queue=QueueNames.RETRY, exc=exc)

View File

@@ -2,6 +2,12 @@ import json
import os
def find_by_service_name(services, service_name):
for i in range(len(services)):
if services[i]['name'] == service_name:
return services[i]
return None
def extract_cloudfoundry_config():
vcap_services = json.loads(os.environ['VCAP_SERVICES'])
@@ -9,3 +15,19 @@ def extract_cloudfoundry_config():
os.environ['SQLALCHEMY_DATABASE_URI'] = vcap_services['aws-rds'][0]['credentials']['uri'].replace('postgres','postgresql')
# Redis config
os.environ['REDIS_URL'] = vcap_services['aws-elasticache-redis'][0]['credentials']['uri'].replace('redis://','rediss://')
# CSV Upload Bucket Name
bucket_service = find_by_service_name(vcap_services['s3'], f"notifications-api-csv-upload-bucket-{os.environ['DEPLOY_ENV']}")
if bucket_service:
os.environ['CSV_UPLOAD_BUCKET_NAME'] = bucket_service['credentials']['bucket']
os.environ['CSV_UPLOAD_ACCESS_KEY'] = bucket_service['credentials']['access_key_id']
os.environ['CSV_UPLOAD_SECRET_KEY'] = bucket_service['credentials']['secret_access_key']
os.environ['CSV_UPLOAD_REGION'] = bucket_service['credentials']['region']
# Contact List Bucket Name
bucket_service = find_by_service_name(vcap_services['s3'], f"notifications-api-contact-list-bucket-{os.environ['DEPLOY_ENV']}")
if bucket_service:
os.environ['CONTACT_LIST_BUCKET_NAME'] = bucket_service['credentials']['bucket']
os.environ['CONTACT_LIST_ACCESS_KEY'] = bucket_service['credentials']['access_key_id']
os.environ['CONTACT_LIST_SECRET_KEY'] = bucket_service['credentials']['secret_access_key']
os.environ['CONTACT_LIST_REGION'] = bucket_service['credentials']['region']

View File

@@ -114,9 +114,6 @@ class Config(object):
FIRETEXT_API_KEY = os.environ.get("FIRETEXT_API_KEY", "placeholder")
FIRETEXT_INTERNATIONAL_API_KEY = os.environ.get("FIRETEXT_INTERNATIONAL_API_KEY", "placeholder")
# Prefix to identify queues in SQS
NOTIFICATION_QUEUE_PREFIX = os.environ.get('NOTIFICATION_QUEUE_PREFIX')
# Use notify.sandbox.10x sending domain unless overwritten by environment
NOTIFY_EMAIL_DOMAIN = 'notify.sandbox.10x.gsa.gov'
@@ -203,11 +200,9 @@ class Config(object):
DVLA_EMAIL_ADDRESSES = json.loads(os.environ.get('DVLA_EMAIL_ADDRESSES', '[]'))
CELERY = {
'broker_url': 'sqs://',
'broker_url': REDIS_URL,
'broker_transport_options': {
'region': AWS_REGION,
'visibility_timeout': 310,
'queue_name_prefix': NOTIFICATION_QUEUE_PREFIX,
},
'timezone': 'Europe/London',
'imports': [
@@ -418,14 +413,20 @@ class Development(Config):
REDIS_ENABLED = os.environ.get('REDIS_ENABLED')
CSV_UPLOAD_BUCKET_NAME = 'local-notifications-csv-upload'
CSV_UPLOAD_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY_ID')
CSV_UPLOAD_SECRET_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
CSV_UPLOAD_REGION = os.environ.get('AWS_REGION', 'us-west-2')
CONTACT_LIST_BUCKET_NAME = 'local-contact-list'
TEST_LETTERS_BUCKET_NAME = 'development-test-letters'
DVLA_RESPONSE_BUCKET_NAME = 'notify.tools-ftp'
LETTERS_PDF_BUCKET_NAME = 'development-letters-pdf'
LETTERS_SCAN_BUCKET_NAME = 'development-letters-scan'
INVALID_PDF_BUCKET_NAME = 'development-letters-invalid-pdf'
TRANSIENT_UPLOADED_LETTERS = 'development-transient-uploaded-letters'
LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise'
CONTACT_LIST_ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY_ID')
CONTACT_LIST_SECRET_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
CONTACT_LIST_REGION = os.environ.get('AWS_REGION', 'us-west-2')
# TEST_LETTERS_BUCKET_NAME = 'development-test-letters'
# DVLA_RESPONSE_BUCKET_NAME = 'notify.tools-ftp'
# LETTERS_PDF_BUCKET_NAME = 'development-letters-pdf'
# LETTERS_SCAN_BUCKET_NAME = 'development-letters-scan'
# INVALID_PDF_BUCKET_NAME = 'development-letters-invalid-pdf'
# TRANSIENT_UPLOADED_LETTERS = 'development-transient-uploaded-letters'
# LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise'
# INTERNAL_CLIENT_API_KEYS = {
# Config.ADMIN_CLIENT_ID: ['dev-notify-secret-key'],
@@ -440,11 +441,10 @@ class Development(Config):
NOTIFY_ENVIRONMENT = 'development'
NOTIFY_LOG_PATH = 'application.log'
NOTIFY_EMAIL_DOMAIN = os.getenv('NOTIFY_EMAIL_DOMAIN', 'notify.sandbox.10x.gsa.gov')
SQLALCHEMY_DATABASE_URI = os.environ.get('SQLALCHEMY_DATABASE_URI', 'postgresql://postgres:chummy@db:5432/notification_api')
REDIS_URL = os.environ.get('REDIS_URL')
ANTIVIRUS_ENABLED = os.environ.get('ANTIVIRUS_ENABLED') == '1'
@@ -473,13 +473,13 @@ class Test(Development):
CSV_UPLOAD_BUCKET_NAME = 'test-notifications-csv-upload'
CONTACT_LIST_BUCKET_NAME = 'test-contact-list'
TEST_LETTERS_BUCKET_NAME = 'test-test-letters'
DVLA_RESPONSE_BUCKET_NAME = 'test.notify.com-ftp'
LETTERS_PDF_BUCKET_NAME = 'test-letters-pdf'
LETTERS_SCAN_BUCKET_NAME = 'test-letters-scan'
INVALID_PDF_BUCKET_NAME = 'test-letters-invalid-pdf'
TRANSIENT_UPLOADED_LETTERS = 'test-transient-uploaded-letters'
LETTER_SANITISE_BUCKET_NAME = 'test-letters-sanitise'
# TEST_LETTERS_BUCKET_NAME = 'test-test-letters'
# DVLA_RESPONSE_BUCKET_NAME = 'test.notify.com-ftp'
# LETTERS_PDF_BUCKET_NAME = 'test-letters-pdf'
# LETTERS_SCAN_BUCKET_NAME = 'test-letters-scan'
# INVALID_PDF_BUCKET_NAME = 'test-letters-invalid-pdf'
# TRANSIENT_UPLOADED_LETTERS = 'test-transient-uploaded-letters'
# LETTER_SANITISE_BUCKET_NAME = 'test-letters-sanitise'
# this is overriden in CI
SQLALCHEMY_DATABASE_URI = os.getenv('SQLALCHEMY_DATABASE_TEST_URI', 'postgresql://postgres:chummy@db:5432/test_notification_api')
@@ -510,13 +510,13 @@ class Preview(Config):
NOTIFY_ENVIRONMENT = 'preview'
CSV_UPLOAD_BUCKET_NAME = 'preview-notifications-csv-upload'
CONTACT_LIST_BUCKET_NAME = 'preview-contact-list'
TEST_LETTERS_BUCKET_NAME = 'preview-test-letters'
DVLA_RESPONSE_BUCKET_NAME = 'notify.works-ftp'
LETTERS_PDF_BUCKET_NAME = 'preview-letters-pdf'
LETTERS_SCAN_BUCKET_NAME = 'preview-letters-scan'
INVALID_PDF_BUCKET_NAME = 'preview-letters-invalid-pdf'
TRANSIENT_UPLOADED_LETTERS = 'preview-transient-uploaded-letters'
LETTER_SANITISE_BUCKET_NAME = 'preview-letters-sanitise'
# TEST_LETTERS_BUCKET_NAME = 'preview-test-letters'
# DVLA_RESPONSE_BUCKET_NAME = 'notify.works-ftp'
# LETTERS_PDF_BUCKET_NAME = 'preview-letters-pdf'
# LETTERS_SCAN_BUCKET_NAME = 'preview-letters-scan'
# INVALID_PDF_BUCKET_NAME = 'preview-letters-invalid-pdf'
# TRANSIENT_UPLOADED_LETTERS = 'preview-transient-uploaded-letters'
# LETTER_SANITISE_BUCKET_NAME = 'preview-letters-sanitise'
FROM_NUMBER = 'preview'
API_RATE_LIMIT_ENABLED = True
CHECK_PROXY_HEADER = False
@@ -527,13 +527,13 @@ class Staging(Config):
NOTIFY_ENVIRONMENT = 'staging'
CSV_UPLOAD_BUCKET_NAME = 'staging-notifications-csv-upload'
CONTACT_LIST_BUCKET_NAME = 'staging-contact-list'
TEST_LETTERS_BUCKET_NAME = 'staging-test-letters'
DVLA_RESPONSE_BUCKET_NAME = 'staging-notify.works-ftp'
LETTERS_PDF_BUCKET_NAME = 'staging-letters-pdf'
LETTERS_SCAN_BUCKET_NAME = 'staging-letters-scan'
INVALID_PDF_BUCKET_NAME = 'staging-letters-invalid-pdf'
TRANSIENT_UPLOADED_LETTERS = 'staging-transient-uploaded-letters'
LETTER_SANITISE_BUCKET_NAME = 'staging-letters-sanitise'
# TEST_LETTERS_BUCKET_NAME = 'staging-test-letters'
# DVLA_RESPONSE_BUCKET_NAME = 'staging-notify.works-ftp'
# LETTERS_PDF_BUCKET_NAME = 'staging-letters-pdf'
# LETTERS_SCAN_BUCKET_NAME = 'staging-letters-scan'
# INVALID_PDF_BUCKET_NAME = 'staging-letters-invalid-pdf'
# TRANSIENT_UPLOADED_LETTERS = 'staging-transient-uploaded-letters'
# LETTER_SANITISE_BUCKET_NAME = 'staging-letters-sanitise'
FROM_NUMBER = 'stage'
API_RATE_LIMIT_ENABLED = True
CHECK_PROXY_HEADER = True
@@ -542,16 +542,22 @@ class Staging(Config):
class Live(Config):
NOTIFY_ENVIRONMENT = 'live'
# buckets
CSV_UPLOAD_BUCKET_NAME = 'notifications-prototype-csv-upload' # created in gsa sandbox
CONTACT_LIST_BUCKET_NAME = 'notifications-prototype-contact-list-upload' # created in gsa sandbox
CSV_UPLOAD_BUCKET_NAME = os.environ.get('CSV_UPLOAD_BUCKET_NAME', 'notifications-prototype-csv-upload') # created in gsa sandbox
CSV_UPLOAD_ACCESS_KEY = os.environ.get('CSV_UPLOAD_ACCESS_KEY')
CSV_UPLOAD_SECRET_KEY = os.environ.get('CSV_UPLOAD_SECRET_KEY')
CSV_UPLOAD_REGION = os.environ.get('CSV_UPLOAD_REGION')
CONTACT_LIST_BUCKET_NAME = os.environ.get('CONTACT_LIST_BUCKET_NAME', 'notifications-prototype-contact-list-upload') # created in gsa sandbox
CONTACT_LIST_ACCESS_KEY = os.environ.get('CONTACT_LIST_ACCESS_KEY')
CONTACT_LIST_SECRET_KEY = os.environ.get('CONTACT_LIST_SECRET_KEY')
CONTACT_LIST_REGION = os.environ.get('CONTACT_LIST_REGION')
# TODO: verify below buckets only used for letters
TEST_LETTERS_BUCKET_NAME = 'production-test-letters' # not created in gsa sandbox
DVLA_RESPONSE_BUCKET_NAME = 'notifications.service.gov.uk-ftp' # not created in gsa sandbox
LETTERS_PDF_BUCKET_NAME = 'production-letters-pdf' # not created in gsa sandbox
LETTERS_SCAN_BUCKET_NAME = 'production-letters-scan' # not created in gsa sandbox
INVALID_PDF_BUCKET_NAME = 'production-letters-invalid-pdf' # not created in gsa sandbox
TRANSIENT_UPLOADED_LETTERS = 'production-transient-uploaded-letters' # not created in gsa sandbox
LETTER_SANITISE_BUCKET_NAME = 'production-letters-sanitise' # not created in gsa sandbox
# TEST_LETTERS_BUCKET_NAME = 'production-test-letters' # not created in gsa sandbox
# DVLA_RESPONSE_BUCKET_NAME = 'notifications.service.gov.uk-ftp' # not created in gsa sandbox
# LETTERS_PDF_BUCKET_NAME = 'production-letters-pdf' # not created in gsa sandbox
# LETTERS_SCAN_BUCKET_NAME = 'production-letters-scan' # not created in gsa sandbox
# INVALID_PDF_BUCKET_NAME = 'production-letters-invalid-pdf' # not created in gsa sandbox
# TRANSIENT_UPLOADED_LETTERS = 'production-transient-uploaded-letters' # not created in gsa sandbox
# LETTER_SANITISE_BUCKET_NAME = 'production-letters-sanitise' # not created in gsa sandbox
FROM_NUMBER = 'US Notify'
API_RATE_LIMIT_ENABLED = True
@@ -563,7 +569,6 @@ class Live(Config):
REDIS_ENABLED = os.environ.get('REDIS_ENABLED')
NOTIFY_LOG_PATH = os.environ.get('NOTIFY_LOG_PATH', 'application.log')
REDIS_URL = os.environ.get('REDIS_URL')
class CloudFoundryConfig(Config):
@@ -576,12 +581,12 @@ class Sandbox(CloudFoundryConfig):
NOTIFY_ENVIRONMENT = 'sandbox'
CSV_UPLOAD_BUCKET_NAME = 'cf-sandbox-notifications-csv-upload'
CONTACT_LIST_BUCKET_NAME = 'cf-sandbox-contact-list'
LETTERS_PDF_BUCKET_NAME = 'cf-sandbox-letters-pdf'
TEST_LETTERS_BUCKET_NAME = 'cf-sandbox-test-letters'
DVLA_RESPONSE_BUCKET_NAME = 'notify.works-ftp'
LETTERS_PDF_BUCKET_NAME = 'cf-sandbox-letters-pdf'
LETTERS_SCAN_BUCKET_NAME = 'cf-sandbox-letters-scan'
INVALID_PDF_BUCKET_NAME = 'cf-sandbox-letters-invalid-pdf'
# LETTERS_PDF_BUCKET_NAME = 'cf-sandbox-letters-pdf'
# TEST_LETTERS_BUCKET_NAME = 'cf-sandbox-test-letters'
# DVLA_RESPONSE_BUCKET_NAME = 'notify.works-ftp'
# LETTERS_PDF_BUCKET_NAME = 'cf-sandbox-letters-pdf'
# LETTERS_SCAN_BUCKET_NAME = 'cf-sandbox-letters-scan'
# INVALID_PDF_BUCKET_NAME = 'cf-sandbox-letters-invalid-pdf'
FROM_NUMBER = 'sandbox'

View File

@@ -10,6 +10,7 @@ letter_job = Blueprint("letter-job", __name__)
register_errors(letter_job)
# too many references will make SQS error (as the task can only be 256kb)
# Maybe doesn't matter anymore with Redis as the celery backing store
MAX_REFERENCES_PER_TASK = 5000

View File

@@ -236,6 +236,7 @@ def process_sms_or_email_notification(
# If SQS cannot put the task on the queue, it's probably because the notification body was too long and it
# went over SQS's 256kb message limit. If the body is very large, it may exceed the HTTP max content length;
# the exception we get here isn't handled correctly by botocore - we get a ResponseParserError instead.
# Hopefully this is no longer an issue with Redis as celery's backing store
current_app.logger.info(
f'Notification {notification_id} failed to save to high volume queue. Using normal flow instead'
)