Merge pull request #3300 from alphagov/multi-internal-auth-179039225

Split up authentication for internal API endpoint
This commit is contained in:
Ben Thorner
2021-08-04 14:35:54 +01:00
committed by GitHub
7 changed files with 392 additions and 453 deletions

View File

@@ -271,10 +271,16 @@ def register_blueprint(application):
def register_v2_blueprints(application):
from app.authentication.auth import requires_auth
from app.authentication.auth import (
requires_auth,
requires_govuk_alerts_auth,
)
from app.v2.broadcast.post_broadcast import (
v2_broadcast_blueprint as post_broadcast,
)
from app.v2.govuk_alerts.get_broadcasts import (
v2_govuk_alerts_blueprint as get_broadcasts,
)
from app.v2.inbound_sms.get_inbound_sms import (
v2_inbound_sms_blueprint as get_inbound_sms,
)
@@ -315,6 +321,9 @@ def register_v2_blueprints(application):
post_broadcast.before_request(requires_auth)
application.register_blueprint(post_broadcast)
get_broadcasts.before_request(requires_govuk_alerts_auth)
application.register_blueprint(get_broadcasts)
def init_app(app):

View File

@@ -49,55 +49,50 @@ class AuthError(Exception):
}
def get_auth_token(req):
auth_header = req.headers.get('Authorization', None)
if not auth_header:
raise AuthError('Unauthorized: authentication token must be provided', 401)
auth_scheme = auth_header[:7].title()
if auth_scheme != 'Bearer ':
raise AuthError('Unauthorized: authentication bearer scheme must be used', 401)
return auth_header[7:]
class InternalApiKey():
def __init__(self, client_id, secret):
self.secret = secret
self.id = client_id
self.expiry_date = None
def requires_no_auth():
pass
def requires_govuk_alerts_auth():
requires_internal_auth(current_app.config.get('GOVUK_ALERTS_CLIENT_ID'))
def requires_admin_auth():
requires_internal_auth(current_app.config.get('ADMIN_CLIENT_ID'))
def requires_internal_auth(expected_client_id):
if expected_client_id not in current_app.config.get('INTERNAL_CLIENT_API_KEYS'):
raise TypeError("Unknown client_id for internal auth")
request_helper.check_proxy_header_before_request()
auth_token = _get_auth_token(request)
client_id = _get_token_issuer(auth_token)
auth_token = get_auth_token(request)
client = __get_token_issuer(auth_token)
if client_id != expected_client_id:
raise AuthError("Unauthorized: not allowed to perform this action", 401)
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME')
api_keys = [
InternalApiKey(client_id, secret)
for secret in current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id]
]
for secret in current_app.config.get('API_INTERNAL_SECRETS'):
try:
decode_jwt_token(auth_token, secret)
return
except TokenExpiredError:
raise AuthError("Invalid token: expired, check that your system clock is accurate", 403)
except TokenDecodeError:
# TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions
# (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather
# than continue on to check the next admin client secret
continue
# Either there are no admin client secrets or their token didn't match one of them so error
raise AuthError("Unauthorized: admin authentication token not found", 401)
else:
raise AuthError('Unauthorized: admin authentication token required', 401)
_decode_jwt_token(auth_token, api_keys, client_id)
g.service_id = client_id
def requires_auth():
request_helper.check_proxy_header_before_request()
auth_token = get_auth_token(request)
issuer = __get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID
auth_token = _get_auth_token(request)
issuer = _get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID
try:
service_id = uuid.UUID(issuer)
@@ -116,15 +111,23 @@ def requires_auth():
if not service.active:
raise AuthError("Invalid token: service is archived", 403, service_id=service.id)
for api_key in service.api_keys:
api_key = _decode_jwt_token(auth_token, service.api_keys, service.id)
g.api_user = api_key
g.service_id = service_id
g.authenticated_service = service
def _decode_jwt_token(auth_token, api_keys, service_id=None):
for api_key in api_keys:
try:
decode_jwt_token(auth_token, api_key.secret)
except TokenExpiredError:
err_msg = "Error: Your system clock must be accurate to within 30 seconds"
raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id)
raise AuthError(err_msg, 403, service_id=service_id, api_key_id=api_key.id)
except TokenAlgorithmError:
err_msg = "Invalid token: algorithm used is not HS256"
raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id)
raise AuthError(err_msg, 403, service_id=service_id, api_key_id=api_key.id)
except TokenDecodeError:
# we attempted to validate the token but it failed meaning it was not signed using this api key.
# Let's try the next one
@@ -134,28 +137,38 @@ def requires_auth():
continue
except TokenError:
# General error when trying to decode and validate the token
raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403, service_id=service.id, api_key_id=api_key.id)
raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403, service_id=service_id, api_key_id=api_key.id)
if api_key.expiry_date:
raise AuthError("Invalid token: API key revoked", 403, service_id=service.id, api_key_id=api_key.id)
g.service_id = service.id
g.api_user = api_key
g.authenticated_service = service
raise AuthError("Invalid token: API key revoked", 403, service_id=service_id, api_key_id=api_key.id)
current_app.logger.info('API authorised for service {} with api key {}, using issuer {} for URL: {}'.format(
service.id,
service_id,
api_key.id,
request.headers.get('User-Agent'),
request.base_url
))
return
return api_key
else:
# service has API keys, but none matching the one the user provided
raise AuthError("Invalid token: API key not found", 403, service_id=service.id)
raise AuthError("Invalid token: API key not found", 403, service_id=service_id)
def __get_token_issuer(auth_token):
def _get_auth_token(req):
auth_header = req.headers.get('Authorization', None)
if not auth_header:
raise AuthError('Unauthorized: authentication token must be provided', 401)
auth_scheme = auth_header[:7].title()
if auth_scheme != 'Bearer ':
raise AuthError('Unauthorized: authentication bearer scheme must be used', 401)
return auth_header[7:]
def _get_token_issuer(auth_token):
try:
issuer = get_token_issuer(auth_token)
except TokenIssuerError:

View File

@@ -84,9 +84,17 @@ class Config(object):
# URL of api app (on AWS this is the internal api endpoint)
API_HOST_NAME = os.getenv('API_HOST_NAME')
# secrets that internal apps, such as the admin app or document download, must use to authenticate with the API
# LEGACY: replacing with INTERNAL_CLIENT_API_KEYS
API_INTERNAL_SECRETS = json.loads(os.environ.get('API_INTERNAL_SECRETS', '[]'))
# secrets that internal apps, such as the admin app or document download, must use to authenticate with the API
ADMIN_CLIENT_ID = 'notify-admin'
GOVUK_ALERTS_CLIENT_ID = 'govuk-alerts'
INTERNAL_CLIENT_API_KEYS = {
ADMIN_CLIENT_ID: API_INTERNAL_SECRETS
}
# encyption secret/salt
SECRET_KEY = os.getenv('SECRET_KEY')
DANGEROUS_SALT = os.getenv('DANGEROUS_SALT')
@@ -129,7 +137,6 @@ class Config(object):
###########################
NOTIFY_ENVIRONMENT = 'development'
ADMIN_CLIENT_USER_NAME = 'notify-admin'
AWS_REGION = 'eu-west-1'
INVITATION_EXPIRATION_DAYS = 2
NOTIFY_APP_NAME = 'api'
@@ -399,7 +406,11 @@ class Development(Config):
TRANSIENT_UPLOADED_LETTERS = 'development-transient-uploaded-letters'
LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise'
API_INTERNAL_SECRETS = ['dev-notify-secret-key']
INTERNAL_CLIENT_API_KEYS = {
Config.ADMIN_CLIENT_ID: ['dev-notify-secret-key'],
Config.GOVUK_ALERTS_CLIENT_ID: ['govuk-alerts-secret-key']
}
SECRET_KEY = 'dev-notify-secret-key'
DANGEROUS_SALT = 'dev-notify-salt'

View File

@@ -0,0 +1,11 @@
from flask import Blueprint
from app.v2.errors import register_errors
v2_govuk_alerts_blueprint = Blueprint(
"v2_govuk-alerts_blueprint",
__name__,
url_prefix='/v2/govuk-alerts',
)
register_errors(v2_govuk_alerts_blueprint)

View File

@@ -0,0 +1,8 @@
from flask import jsonify
from app.v2.govuk_alerts import v2_govuk_alerts_blueprint
@v2_govuk_alerts_blueprint.route('')
def get_broadcasts():
return jsonify({})