mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-16 02:02:13 -05:00
Support granular API auth for internal apps
Previously we just had a single array of API keys / secrets, any of which could be used to get past the "requires_admin_auth" check. While multiple keys are necessary to allow for rotation, we should avoid giving other apps access this way (too much privilege). This converts the existing config vars into a new dictionary, keyed by client_id. We can then use the dictionary to scope auth for new API consumers like gov.uk/alerts to just the endpoints they need to access, while maintaining existing access for the Admin app. Once the new dictionary is available as a JSON environment variable, we'll be able to remove the old credentials / config. In the next commits, we'll look at more tests for the new functionality.
This commit is contained in:
@@ -67,30 +67,37 @@ def requires_no_auth():
|
||||
|
||||
|
||||
def requires_admin_auth():
|
||||
requires_internal_auth(current_app.config.get('ADMIN_CLIENT_USER_NAME'))
|
||||
|
||||
|
||||
def requires_internal_auth(expected_client_id):
|
||||
if expected_client_id not in current_app.config.get('INTERNAL_CLIENT_API_KEYS'):
|
||||
raise TypeError("Unknown client_id for internal auth")
|
||||
|
||||
request_helper.check_proxy_header_before_request()
|
||||
|
||||
auth_token = get_auth_token(request)
|
||||
client = __get_token_issuer(auth_token)
|
||||
client_id = __get_token_issuer(auth_token)
|
||||
|
||||
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
||||
g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
if client_id != expected_client_id:
|
||||
raise AuthError("Unauthorized: not allowed to perform this action", 401)
|
||||
|
||||
for secret in current_app.config.get('API_INTERNAL_SECRETS'):
|
||||
try:
|
||||
decode_jwt_token(auth_token, secret)
|
||||
return
|
||||
except TokenExpiredError:
|
||||
raise AuthError("Invalid token: expired, check that your system clock is accurate", 403)
|
||||
except TokenDecodeError:
|
||||
# TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions
|
||||
# (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather
|
||||
# than continue on to check the next admin client secret
|
||||
continue
|
||||
g.service_id = client_id
|
||||
secrets = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id]
|
||||
|
||||
# Either there are no admin client secrets or their token didn't match one of them so error
|
||||
raise AuthError("Unauthorized: admin authentication token not found", 401)
|
||||
else:
|
||||
raise AuthError('Unauthorized: admin authentication token required', 401)
|
||||
for secret in secrets:
|
||||
try:
|
||||
decode_jwt_token(auth_token, secret)
|
||||
return
|
||||
except TokenExpiredError:
|
||||
raise AuthError("Invalid token: expired, check that your system clock is accurate", 403)
|
||||
except TokenDecodeError:
|
||||
# TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions
|
||||
# (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather
|
||||
# than continue on to check the next admin client secret
|
||||
continue
|
||||
|
||||
# Either there are no admin client secrets or their token didn't match one of them so error
|
||||
raise AuthError("Unauthorized: API authentication token not found", 401)
|
||||
|
||||
|
||||
def requires_auth():
|
||||
|
||||
Reference in New Issue
Block a user