Files
notifications-api/notifications_utils/s3.py

46 lines
1.2 KiB
Python
Raw Normal View History

import botocore
from boto3 import Session
from botocore.config import Config
from flask import current_app
AWS_CLIENT_CONFIG = Config(
# This config is required to enable S3 to connect to FIPS-enabled
# endpoints. See https://aws.amazon.com/compliance/fips/ for more
# information.
s3={
"addressing_style": "virtual",
},
2025-01-09 11:28:24 -08:00
max_pool_connections=50,
use_fips_endpoint=True,
)
2025-08-06 10:59:32 -07:00
default_regions = "us-gov-west-1"
2025-01-09 07:47:47 -08:00
def get_s3_resource():
2025-08-06 11:41:14 -07:00
access_key = current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"]
secret_key = current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"]
region = current_app.config["CSV_UPLOAD_BUCKET"]["region"]
2025-08-06 10:17:56 -07:00
session = Session(
2025-08-06 11:41:14 -07:00
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
2025-08-06 10:17:56 -07:00
)
2025-08-06 11:41:14 -07:00
s3_resource = session.resource("s3", config=AWS_CLIENT_CONFIG)
return s3_resource
class S3ObjectNotFound(botocore.exceptions.ClientError):
pass
def s3download(
bucket_name,
filename,
):
try:
2025-01-09 07:47:47 -08:00
s3 = get_s3_resource()
key = s3.Object(bucket_name, filename)
return key.get()["Body"]
except botocore.exceptions.ClientError as error:
raise S3ObjectNotFound(error.response, error.operation_name)