This commit is contained in:
Kenneth Kehl
2025-06-10 10:36:45 -07:00
parent 34434bda57
commit 9c42927c59
7 changed files with 561 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
# Version numbering follows Semantic Versionning:
#
# Given a version number MAJOR.MINOR.PATCH, increment the:
# - MAJOR version when you make incompatible API changes,
# - MINOR version when you add functionality in a backwards-compatible manner, and
# - PATCH version when you make backwards-compatible bug fixes.
#
# -- http://semver.org/
__version__ = "10.0.1"
from notifications_python_client.errors import ( # noqa
REQUEST_ERROR_MESSAGE,
REQUEST_ERROR_STATUS_CODE,
)
from notifications_python_client.notifications import NotificationsAPIClient # noqa
from notifications_python_client.utils import prepare_upload # noqa

View File

@@ -0,0 +1,153 @@
import calendar
import time
import jwt
from notifications_python_client.errors import (
TokenAlgorithmError,
TokenDecodeError,
TokenError,
TokenExpiredError,
TokenIssuedAtError,
TokenIssuerError,
)
__algorithm__ = "HS256"
__type__ = "JWT"
__bound__ = 30
INVALID_FUTURE_TOKEN_ERROR_MESSAGE = "Token can not be in the future"
def create_jwt_token(secret, client_id):
"""
Create JWT token for GOV.UK Notify
Tokens have standard header:
{
"typ": "JWT",
"alg": "HS256"
}
Claims consist of:
iss: identifier for the client
iat: issued at in epoch seconds (UTC)
:param secret: Application signing secret
:param client_id: Identifier for the client
:return: JWT token for this request
"""
assert secret, "Missing secret key"
assert client_id, "Missing client id"
headers = {"typ": __type__, "alg": __algorithm__}
claims = {"iss": client_id, "iat": epoch_seconds()}
t = jwt.encode(payload=claims, key=secret, headers=headers)
if isinstance(t, str):
return t
else:
return t.decode()
def get_token_issuer(token):
"""
Issuer of a token is the identifier used to recover the secret
Need to extract this from token to ensure we can proceed to the signature validation stage
Does not check validity of the token
:param token: signed JWT token
:return issuer: iss field of the JWT token
:raises TokenIssuerError: if iss field not present
:raises TokenDecodeError: if token does not conform to JWT spec
"""
try:
unverified = decode_token(token)
if "iss" not in unverified:
raise TokenIssuerError
return unverified.get("iss")
except jwt.DecodeError as e:
raise TokenDecodeError from e
def decode_jwt_token(token, secret):
"""
Validates and decodes the JWT token
Token checked for
- signature of JWT token
- token issued date is valid
:param token: jwt token
:param secret: client specific secret
:return boolean: True if valid token, False otherwise
:raises TokenIssuerError: if iss field not present
:raises TokenIssuedAtError: if iat field not present
:raises TokenExpiredError: If the iat value expires this token
:raises TokenDecodeError: If the token cannot be decoded because it failed validation
:raises TokenAlgorithmError: If the algorithm is not recognised
:raises TokenError: If any other type of jwt exception is raised when trying jwt.decode
"""
try:
# check signature of the token
decoded_token = jwt.decode(
token,
key=secret,
options={"verify_signature": True},
algorithms=[__algorithm__],
leeway=__bound__,
)
return validate_jwt_token(decoded_token)
except jwt.InvalidIssuedAtError as e:
raise TokenExpiredError(
"Token has invalid iat field", decode_token(token)
) from e
except jwt.ImmatureSignatureError as e:
raise TokenExpiredError(
INVALID_FUTURE_TOKEN_ERROR_MESSAGE, decode_token(token)
) from e
except jwt.DecodeError as e:
raise TokenDecodeError from e
except jwt.InvalidAlgorithmError as e:
raise TokenAlgorithmError from e
except jwt.InvalidTokenError as e:
# At this point, we have not caught a specific exception we care about enough to show
# a precise error message (ie something to do with the iat, iss or alg fields).
# If there is a different reason our token is invalid we will throw a generic error as we
# don't wish to provide exact messages for every type of error that jwt might encounter.
# https://github.com/jpadilla/pyjwt/blob/master/jwt/exceptions.py
# https://pyjwt.readthedocs.io/en/latest/api.html#exceptions
raise TokenError from e
def validate_jwt_token(decoded_token):
# token has all the required fields
if "iss" not in decoded_token:
raise TokenIssuerError
if "iat" not in decoded_token:
raise TokenIssuedAtError
# check iat time is within bounds
now = epoch_seconds()
iat = int(decoded_token["iat"])
if now > (iat + __bound__):
raise TokenExpiredError("Token has expired", decoded_token)
if iat > (now + __bound__):
raise TokenExpiredError(INVALID_FUTURE_TOKEN_ERROR_MESSAGE, decoded_token)
return True
def decode_token(token):
"""
Decode token but don;t check the signature
:param token:
:return decoded token:
"""
return jwt.decode(
token, options={"verify_signature": False}, algorithms=[__algorithm__]
)
def epoch_seconds():
return calendar.timegm(time.gmtime())

View File

@@ -0,0 +1,126 @@
import json
import logging
import time
import urllib.parse
import requests
from notifications_python_client import __version__
from notifications_python_client.authentication import create_jwt_token
from notifications_python_client.errors import HTTPError, InvalidResponse
logger = logging.getLogger(__name__)
class BaseAPIClient:
"""
Base class for GOV.UK Notify API client.
This class is not thread-safe.
"""
def __init__(
self, api_key, base_url="https://api.notifications.service.gov.uk", timeout=30
):
"""
Initialise the client
Error if either of base_url or secret missing
:param base_url - base URL of GOV.UK Notify API:
:param secret - application secret - used to sign the request:
:param timeout - request timeout on the client
:return:
"""
service_id = api_key[-73:-37]
api_key = api_key[-36:]
assert base_url, "Missing base url"
assert service_id, "Missing service ID"
assert api_key, "Missing API key"
self.base_url = base_url
self.service_id = service_id
self.api_key = api_key
self.timeout = timeout
self.request_session = requests.Session()
def put(self, url, data):
return self.request("PUT", url, data=data)
def get(self, url, params=None):
return self.request("GET", url, params=params)
def post(self, url, data):
return self.request("POST", url, data=data)
def delete(self, url, data=None):
return self.request("DELETE", url, data=data)
def generate_headers(self, api_token):
return {
"Content-type": "application/json",
"Authorization": f"Bearer {api_token}",
"User-agent": f"NOTIFY-API-PYTHON-CLIENT/{__version__}",
}
def request(self, method, url, data=None, params=None):
logger.debug("API request %s %s", method, url)
url, kwargs = self._create_request_objects(url, data, params)
response = self._perform_request(method, url, kwargs)
return self._process_json_response(response)
def _create_request_objects(self, url, data, params):
api_token = create_jwt_token(self.api_key, self.service_id)
kwargs = {"headers": self.generate_headers(api_token), "timeout": self.timeout}
if data is not None:
kwargs.update(data=self._serialize_data(data))
if params is not None:
kwargs.update(params=params)
url = urllib.parse.urljoin(str(self.base_url), str(url))
return url, kwargs
def _serialize_data(self, data):
return json.dumps(data, default=self._extended_json_encoder)
def _extended_json_encoder(self, obj):
if isinstance(obj, set):
return list(obj)
raise TypeError
def _perform_request(self, method, url, kwargs):
start_time = time.monotonic()
try:
response = self.request_session.request(method, url, **kwargs)
response.raise_for_status()
return response
except requests.RequestException as e:
api_error = HTTPError.create(e)
logger.warning(
"API %s request on %s failed with %s '%s'",
method,
url,
api_error.status_code,
api_error.message,
)
raise api_error from e
finally:
elapsed_time = time.monotonic() - start_time
logger.debug(
"API %s request on %s finished in %s", method, url, elapsed_time
)
def _process_json_response(self, response):
try:
if response.status_code == 204:
return
return response.json()
except ValueError as e:
raise InvalidResponse(
response, message="No JSON response object could be decoded"
) from e

View File

@@ -0,0 +1,90 @@
from typing import List, Union # noqa: UP035 Python <3.10 compatibility
from requests import RequestException, Response
REQUEST_ERROR_STATUS_CODE = 503
REQUEST_ERROR_MESSAGE = "Request failed"
TOKEN_ERROR_GUIDANCE = "See our requirements for JSON Web Tokens \
at https://docs.notifications.service.gov.uk/rest-api.html#authorisation-header"
TOKEN_ERROR_DEFAULT_ERROR_MESSAGE = "Invalid token: " + TOKEN_ERROR_GUIDANCE
class TokenError(Exception):
def __init__(self, message=None, token=None):
self.message = (
message + ". " + TOKEN_ERROR_GUIDANCE
if message
else TOKEN_ERROR_DEFAULT_ERROR_MESSAGE
)
self.token = token
class TokenExpiredError(TokenError):
pass
class TokenAlgorithmError(TokenError):
def __init__(self):
super().__init__("Invalid token: algorithm used is not HS256")
class TokenDecodeError(TokenError):
def __init__(self, message=None):
super().__init__(message or "Invalid token: signature")
class TokenIssuerError(TokenDecodeError):
def __init__(self):
super().__init__("Invalid token: iss field not provided")
class TokenIssuedAtError(TokenDecodeError):
def __init__(self):
super().__init__("Invalid token: iat field not provided")
class APIError(Exception):
def __init__(self, response: Response = None, message: str = None):
self.response = response
self._message = message
def __str__(self):
return f"{self.status_code} - {self.message}"
@property
def message(
self,
) -> Union[str, List[dict]]: # noqa: UP006, UP007 Python <3.10 compatibility
try:
json_resp = self.response.json() # type: ignore
return json_resp.get("message", json_resp.get("errors"))
except (TypeError, ValueError, AttributeError, KeyError):
return self._message or REQUEST_ERROR_MESSAGE
@property
def status_code(self) -> int:
try:
return self.response.status_code # type: ignore
except AttributeError:
return REQUEST_ERROR_STATUS_CODE
class HTTPError(APIError):
@staticmethod
def create(e: RequestException) -> "HTTPError":
error = HTTPError(e.response)
if error.status_code == 503:
error = HTTP503Error(e.response)
return error
class HTTP503Error(HTTPError):
"""Specific instance of HTTPError for 503 errors
Used for detecting whether failed requests should be retried.
"""
class InvalidResponse(APIError):
pass

View File

@@ -0,0 +1,154 @@
import base64
import logging
import re
from io import BytesIO
from notifications_python_client.base import BaseAPIClient
logger = logging.getLogger(__name__)
class NotificationsAPIClient(BaseAPIClient):
def send_sms_notification(
self,
phone_number,
template_id,
personalisation=None,
reference=None,
sms_sender_id=None,
):
notification = {"phone_number": phone_number, "template_id": template_id}
if personalisation:
notification.update({"personalisation": personalisation})
if reference:
notification.update({"reference": reference})
if sms_sender_id:
notification.update({"sms_sender_id": sms_sender_id})
return self.post("/v2/notifications/sms", data=notification)
def send_email_notification(
self,
email_address,
template_id,
personalisation=None,
reference=None,
email_reply_to_id=None,
one_click_unsubscribe_url=None,
):
notification = {"email_address": email_address, "template_id": template_id}
if personalisation:
notification.update({"personalisation": personalisation})
if reference:
notification.update({"reference": reference})
if email_reply_to_id:
notification.update({"email_reply_to_id": email_reply_to_id})
if one_click_unsubscribe_url:
notification.update(
{"one_click_unsubscribe_url": one_click_unsubscribe_url}
)
return self.post("/v2/notifications/email", data=notification)
def send_letter_notification(self, template_id, personalisation, reference=None):
notification = {"template_id": template_id, "personalisation": personalisation}
if reference:
notification.update({"reference": reference})
return self.post("/v2/notifications/letter", data=notification)
def send_precompiled_letter_notification(self, reference, pdf_file, postage=None):
content = base64.b64encode(pdf_file.read()).decode("utf-8")
notification = {"reference": reference, "content": content}
if postage:
notification["postage"] = postage
return self.post("/v2/notifications/letter", data=notification)
def get_received_texts(self, older_than=None):
if older_than:
query_string = f"?older_than={older_than}"
else:
query_string = ""
return self.get(f"/v2/received-text-messages{query_string}")
def get_received_texts_iterator(self, older_than=None):
result = self.get_received_texts(older_than=older_than)
received_texts = result.get("received_text_messages")
while received_texts:
yield from received_texts
next_link = result["links"].get("next")
received_text_id = re.search(
"[0-F]{8}-[0-F]{4}-[0-F]{4}-[0-F]{4}-[0-F]{12}", next_link, re.I
).group(0)
result = self.get_received_texts(older_than=received_text_id)
received_texts = result.get("received_text_messages")
def get_notification_by_id(self, id):
return self.get(f"/v2/notifications/{id}")
def get_pdf_for_letter(self, id):
url = f"/v2/notifications/{id}/pdf"
logger.debug("API request %s %s", "GET", url)
url, kwargs = self._create_request_objects(url, data=None, params=None)
response = self._perform_request("GET", url, kwargs)
return BytesIO(response.content)
def get_all_notifications(
self,
status=None,
template_type=None,
reference=None,
older_than=None,
include_jobs=None,
):
data = {}
if status:
data.update({"status": status})
if template_type:
data.update({"template_type": template_type})
if reference:
data.update({"reference": reference})
if older_than:
data.update({"older_than": older_than})
if include_jobs:
data.update({"include_jobs": include_jobs})
return self.get("/v2/notifications", params=data)
def get_all_notifications_iterator(
self, status=None, template_type=None, reference=None, older_than=None
):
result = self.get_all_notifications(
status, template_type, reference, older_than
)
notifications = result.get("notifications")
while notifications:
yield from notifications
next_link = result["links"].get("next")
notification_id = re.search(
"[0-F]{8}-[0-F]{4}-[0-F]{4}-[0-F]{4}-[0-F]{12}", next_link, re.I
).group(0)
result = self.get_all_notifications(
status, template_type, reference, notification_id
)
notifications = result.get("notifications")
def post_template_preview(self, template_id, personalisation):
template = {"personalisation": personalisation}
return self.post(f"/v2/template/{template_id}/preview", data=template)
def get_template(self, template_id):
return self.get(f"/v2/template/{template_id}")
def get_template_version(self, template_id, version):
return self.get(f"/v2/template/{template_id}/version/{version}")
def get_all_template_versions(self, template_id):
return self.get(f"service/{self.service_id}/template/{template_id}/versions")
def get_all_templates(self, template_type=None):
_template_type = f"?type={template_type}" if template_type else ""
return self.get(f"/v2/templates{_template_type}")

View File

View File

@@ -0,0 +1,21 @@
import base64
DOCUMENT_UPLOAD_SIZE_LIMIT = 2 * 1024 * 1024
def prepare_upload(
f, filename=None, confirm_email_before_download=None, retention_period=None
):
contents = f.read()
if len(contents) > DOCUMENT_UPLOAD_SIZE_LIMIT:
raise ValueError("File is larger than 2MB")
file_data = {
"file": base64.b64encode(contents).decode("ascii"),
"filename": filename,
"confirm_email_before_download": confirm_email_before_download,
"retention_period": retention_period,
}
return file_data