import base64 import re from urllib.parse import urlparse import oscrypto.asymmetric import oscrypto.errors import requests import six from app import redis_store from app.config import Config VALIDATE_SNS_TOPICS = Config.VALIDATE_SNS_TOPICS VALID_SNS_TOPICS = Config.VALID_SNS_TOPICS _signing_cert_cache = {} _cert_url_re = re.compile( r'sns\.([a-z]{1,3}-[a-z]+-[0-9]{1,2})\.amazonaws\.com', ) class ValidationError(Exception): """ ValidationError. Raised when a message fails integrity checks. """ def get_certificate(url): res = redis_store.get(url) if res is not None: return res res = requests.get(url).text redis_store.set(url, res, ex=60 * 60) # 60 minutes _signing_cert_cache[url] = res return res def validate_arn(sns_payload): if VALIDATE_SNS_TOPICS: arn = sns_payload.get('TopicArn') topic_name = arn.split(':')[5] if topic_name not in VALID_SNS_TOPICS: raise ValidationError("Invalid Topic Name") def get_string_to_sign(sns_payload): payload_type = sns_payload.get('Type') if payload_type in ['SubscriptionConfirmation', 'UnsubscribeConfirmation']: fields = ['Message', 'MessageId', 'SubscribeURL', 'Timestamp', 'Token', 'TopicArn', 'Type'] elif payload_type == 'Notification': fields = ['Message', 'MessageId', 'Subject', 'Timestamp', 'TopicArn', 'Type'] else: raise ValidationError("Unexpected Message Type") string_to_sign = '' for field in fields: field_value = sns_payload.get(field) if not isinstance(field_value, str): if field == 'Subject' and field_value == None: continue raise ValidationError(f"In {field}, found non-string value: {field_value}") string_to_sign += field + '\n' + field_value + '\n' if isinstance(string_to_sign, six.text_type): string_to_sign = string_to_sign.encode() return string_to_sign def validate_sns_cert(sns_payload): """ Adapted from the solution posted at https://github.com/boto/boto3/issues/2508#issuecomment-992931814 Modified to swap m2crypto for oscrypto """ if not isinstance(sns_payload, dict): raise ValidationError("Unexpected message type {!r}".format(type(sns_payload).__name__)) # Amazon SNS currently supports signature version 1. if sns_payload.get('SignatureVersion') != '1': raise ValidationError("Wrong Signature Version (expected 1)") validate_arn(sns_payload) string_to_sign = get_string_to_sign(sns_payload) # Key signing cert url via Lambda and via webhook are slightly different signing_cert_url = sns_payload.get('SigningCertUrl') if 'SigningCertUrl' in sns_payload else sns_payload.get('SigningCertURL') if not isinstance(signing_cert_url, str): raise ValidationError("Signing cert url must be a string") cert_scheme, cert_netloc, *_ = urlparse(signing_cert_url) if cert_scheme != 'https' or not re.match(_cert_url_re, cert_netloc): raise ValidationError("Cert does not appear to be from AWS") certificate = _signing_cert_cache.get(signing_cert_url) if certificate is None: certificate = get_certificate(signing_cert_url) if isinstance(certificate, six.text_type): certificate = certificate.encode() signature = base64.b64decode(sns_payload["Signature"]) try: oscrypto.asymmetric.rsa_pkcs1v15_verify( oscrypto.asymmetric.load_certificate(certificate), signature, string_to_sign, "sha1" ) return True except oscrypto.errors.SignatureError: raise ValidationError("Invalid signature")