Files
notifications-api/app/notifications/sns_cert_validator.py

125 lines
3.7 KiB
Python
Raw Normal View History

import base64
import re
from urllib.parse import urlparse
2022-09-23 15:57:06 -07:00
import oscrypto.asymmetric
import oscrypto.errors
2022-09-30 10:59:48 -04:00
import requests
import six
from app import redis_store
from app.config import Config
2022-10-04 17:42:04 -07:00
VALIDATE_SNS_TOPICS = Config.VALIDATE_SNS_TOPICS
2022-09-20 20:22:12 -07:00
VALID_SNS_TOPICS = Config.VALID_SNS_TOPICS
_signing_cert_cache = {}
_cert_url_re = re.compile(
2023-08-29 14:54:30 -07:00
r"sns\.([a-z]{1,3}(?:-gov)?-[a-z]+-[0-9]{1,2})\.amazonaws\.com",
)
2022-10-14 14:45:27 +00:00
2022-09-23 15:57:06 -07:00
class ValidationError(Exception):
"""
ValidationError. Raised when a message fails integrity checks.
"""
def get_certificate(url):
2022-10-04 17:42:04 -07:00
res = redis_store.get(url)
if res is not None:
return res
2022-10-04 17:42:04 -07:00
res = requests.get(url).text
redis_store.set(url, res, ex=60 * 60) # 60 minutes
2022-10-04 18:03:38 -07:00
_signing_cert_cache[url] = res
2022-10-04 17:42:04 -07:00
return res
2022-09-23 15:57:06 -07:00
def validate_arn(sns_payload):
2022-10-04 17:42:04 -07:00
if VALIDATE_SNS_TOPICS:
2023-08-29 14:54:30 -07:00
arn = sns_payload.get("TopicArn")
2023-01-31 17:27:17 -05:00
if arn not in VALID_SNS_TOPICS:
2022-09-23 15:57:06 -07:00
raise ValidationError("Invalid Topic Name")
2022-09-23 15:57:06 -07:00
def get_string_to_sign(sns_payload):
2023-08-29 14:54:30 -07:00
payload_type = sns_payload.get("Type")
if payload_type in ["SubscriptionConfirmation", "UnsubscribeConfirmation"]:
fields = [
"Message",
"MessageId",
"SubscribeURL",
"Timestamp",
"Token",
"TopicArn",
"Type",
]
elif payload_type == "Notification":
fields = ["Message", "MessageId", "Subject", "Timestamp", "TopicArn", "Type"]
else:
2022-09-23 15:57:06 -07:00
raise ValidationError("Unexpected Message Type")
2023-08-29 14:54:30 -07:00
string_to_sign = ""
for field in fields:
field_value = sns_payload.get(field)
if not isinstance(field_value, str):
2023-08-29 14:54:30 -07:00
if field == "Subject" and field_value is None:
2022-09-23 15:57:06 -07:00
continue
raise ValidationError(f"In {field}, found non-string value: {field_value}")
2023-08-29 14:54:30 -07:00
string_to_sign += field + "\n" + field_value + "\n"
2022-09-23 15:57:06 -07:00
if isinstance(string_to_sign, six.text_type):
string_to_sign = string_to_sign.encode()
return string_to_sign
2022-09-23 15:57:06 -07:00
2022-10-03 09:05:34 -07:00
def validate_sns_cert(sns_payload):
2022-09-23 15:57:06 -07:00
"""
Adapted from the solution posted at
https://github.com/boto/boto3/issues/2508#issuecomment-992931814
2022-10-03 09:05:34 -07:00
Modified to swap m2crypto for oscrypto
2022-09-23 15:57:06 -07:00
"""
if not isinstance(sns_payload, dict):
2023-08-29 14:54:30 -07:00
raise ValidationError(
"Unexpected message type {!r}".format(type(sns_payload).__name__)
)
2022-09-23 15:57:06 -07:00
# Amazon SNS currently supports signature version 1.
2023-08-29 14:54:30 -07:00
if sns_payload.get("SignatureVersion") != "1":
2022-09-23 15:57:06 -07:00
raise ValidationError("Wrong Signature Version (expected 1)")
2022-10-14 14:45:27 +00:00
2022-09-23 15:57:06 -07:00
validate_arn(sns_payload)
2022-10-14 14:45:27 +00:00
2022-09-23 15:57:06 -07:00
string_to_sign = get_string_to_sign(sns_payload)
# Key signing cert url via Lambda and via webhook are slightly different
2023-08-29 14:54:30 -07:00
signing_cert_url = (
sns_payload.get("SigningCertUrl")
if "SigningCertUrl" in sns_payload
else sns_payload.get("SigningCertURL")
)
if not isinstance(signing_cert_url, str):
2022-09-23 15:57:06 -07:00
raise ValidationError("Signing cert url must be a string")
cert_scheme, cert_netloc, *_ = urlparse(signing_cert_url)
2023-08-29 14:54:30 -07:00
if cert_scheme != "https" or not re.match(_cert_url_re, cert_netloc):
2022-09-23 15:57:06 -07:00
raise ValidationError("Cert does not appear to be from AWS")
2022-10-14 14:45:27 +00:00
certificate = _signing_cert_cache.get(signing_cert_url)
if certificate is None:
2022-09-23 15:57:06 -07:00
certificate = get_certificate(signing_cert_url)
if isinstance(certificate, six.text_type):
certificate = certificate.encode()
2022-10-14 14:45:27 +00:00
2022-09-23 15:57:06 -07:00
signature = base64.b64decode(sns_payload["Signature"])
2022-09-23 15:57:06 -07:00
try:
oscrypto.asymmetric.rsa_pkcs1v15_verify(
oscrypto.asymmetric.load_certificate(certificate),
signature,
string_to_sign,
2023-08-29 14:54:30 -07:00
"sha1",
2022-09-23 15:57:06 -07:00
)
return True
except oscrypto.errors.SignatureError:
2022-10-14 14:45:27 +00:00
raise ValidationError("Invalid signature")