mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-24 17:31:34 -05:00
Merge pull request #2721 from alphagov/rotate-keys
Allow multiple keys for the `ADMIN_CLIENT_SECRET`
This commit is contained in:
@@ -61,7 +61,21 @@ def requires_admin_auth():
|
||||
|
||||
if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'):
|
||||
g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
return handle_admin_key(auth_token, current_app.config.get('ADMIN_CLIENT_SECRET'))
|
||||
|
||||
for secret in current_app.config.get('API_INTERNAL_SECRETS'):
|
||||
try:
|
||||
decode_jwt_token(auth_token, secret)
|
||||
return
|
||||
except TokenExpiredError:
|
||||
raise AuthError("Invalid token: expired, check that your system clock is accurate", 403)
|
||||
except TokenDecodeError:
|
||||
# TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions
|
||||
# (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather
|
||||
# than continue on to check the next admin client secret
|
||||
continue
|
||||
|
||||
# Either there are no admin client secrets or their token didn't match one of them so error
|
||||
raise AuthError("Unauthorized: admin authentication token not found", 401)
|
||||
else:
|
||||
raise AuthError('Unauthorized: admin authentication token required', 401)
|
||||
|
||||
@@ -130,12 +144,3 @@ def __get_token_issuer(auth_token):
|
||||
except TokenDecodeError:
|
||||
raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403)
|
||||
return issuer
|
||||
|
||||
|
||||
def handle_admin_key(auth_token, secret):
|
||||
try:
|
||||
decode_jwt_token(auth_token, secret)
|
||||
except TokenExpiredError:
|
||||
raise AuthError("Invalid token: expired, check that your system clock is accurate", 403)
|
||||
except TokenDecodeError:
|
||||
raise AuthError("Invalid token: could not decode your API token", 403)
|
||||
|
||||
@@ -64,8 +64,8 @@ class Config(object):
|
||||
# URL of api app (on AWS this is the internal api endpoint)
|
||||
API_HOST_NAME = os.getenv('API_HOST_NAME')
|
||||
|
||||
# admin app api key
|
||||
ADMIN_CLIENT_SECRET = os.getenv('ADMIN_CLIENT_SECRET')
|
||||
# secrets that internal apps, such as the admin app or document download, must use to authenticate with the API
|
||||
API_INTERNAL_SECRETS = json.loads(os.environ.get('API_INTERNAL_SECRETS', '[]'))
|
||||
|
||||
# encyption secret/salt
|
||||
SECRET_KEY = os.getenv('SECRET_KEY')
|
||||
@@ -369,7 +369,7 @@ class Development(Config):
|
||||
TRANSIENT_UPLOADED_LETTERS = 'development-transient-uploaded-letters'
|
||||
LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise'
|
||||
|
||||
ADMIN_CLIENT_SECRET = 'dev-notify-secret-key'
|
||||
API_INTERNAL_SECRETS = ['dev-notify-secret-key']
|
||||
SECRET_KEY = 'dev-notify-secret-key'
|
||||
DANGEROUS_SALT = 'dev-notify-salt'
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ applications:
|
||||
|
||||
# Credentials variables
|
||||
ADMIN_BASE_URL: '{{ ADMIN_BASE_URL }}'
|
||||
ADMIN_CLIENT_SECRET: '{{ ADMIN_CLIENT_SECRET }}'
|
||||
API_INTERNAL_SECRETS: '{{ API_INTERNAL_SECRETS }}'
|
||||
API_HOST_NAME: '{{ API_HOST_NAME }}'
|
||||
DANGEROUS_SALT: '{{ DANGEROUS_SALT }}'
|
||||
SECRET_KEY: '{{ SECRET_KEY }}'
|
||||
|
||||
@@ -28,7 +28,7 @@ def create_authorization_header(service_id=None, key_type=KEY_TYPE_NORMAL):
|
||||
|
||||
else:
|
||||
client_id = current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
secret = current_app.config['ADMIN_CLIENT_SECRET']
|
||||
secret = current_app.config['API_INTERNAL_SECRETS'][0]
|
||||
|
||||
token = create_jwt_token(secret=secret, client_id=client_id)
|
||||
return 'Authorization', 'Bearer {}'.format(token)
|
||||
|
||||
@@ -104,8 +104,9 @@ def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_a
|
||||
assert exc.value.short_message == 'Invalid token: algorithm used is not HS256'
|
||||
|
||||
|
||||
def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key):
|
||||
def test_admin_auth_should_not_allow_request_with_no_iat(client):
|
||||
iss = current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
secret = current_app.config['API_INTERNAL_SECRETS'][0]
|
||||
|
||||
# code copied from notifications_python_client.authentication.py::create_jwt_token
|
||||
headers = {
|
||||
@@ -118,12 +119,35 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key)
|
||||
# 'iat': not provided
|
||||
}
|
||||
|
||||
token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode()
|
||||
token = jwt.encode(payload=claims, key=secret, headers=headers).decode()
|
||||
|
||||
request.headers = {'Authorization': 'Bearer {}'.format(token)}
|
||||
with pytest.raises(AuthError) as exc:
|
||||
requires_admin_auth()
|
||||
assert exc.value.short_message == "Invalid token: could not decode your API token"
|
||||
assert exc.value.short_message == "Unauthorized: admin authentication token not found"
|
||||
|
||||
|
||||
def test_admin_auth_should_not_allow_request_with_old_iat(client):
|
||||
iss = current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
secret = current_app.config['API_INTERNAL_SECRETS'][0]
|
||||
|
||||
# code copied from notifications_python_client.authentication.py::create_jwt_token
|
||||
headers = {
|
||||
"typ": 'JWT',
|
||||
"alg": 'HS256'
|
||||
}
|
||||
|
||||
claims = {
|
||||
'iss': iss,
|
||||
'iat': int(time.time()) - 60
|
||||
}
|
||||
|
||||
token = jwt.encode(payload=claims, key=secret, headers=headers).decode()
|
||||
|
||||
request.headers = {'Authorization': 'Bearer {}'.format(token)}
|
||||
with pytest.raises(AuthError) as exc:
|
||||
requires_admin_auth()
|
||||
assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate"
|
||||
|
||||
|
||||
def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key):
|
||||
@@ -188,11 +212,28 @@ def test_should_allow_valid_token_for_request_with_path_params_for_public_url(cl
|
||||
|
||||
|
||||
def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client):
|
||||
token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRET'], current_app.config['ADMIN_CLIENT_USER_NAME'])
|
||||
token = create_jwt_token(
|
||||
current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
)
|
||||
response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_with_second_secret(client):
|
||||
with set_config(client.application, 'API_INTERNAL_SECRETS', ["secret1", "secret2"]):
|
||||
token = create_jwt_token(
|
||||
current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
)
|
||||
response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
token = create_jwt_token(
|
||||
current_app.config['API_INTERNAL_SECRETS'][1], current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
)
|
||||
response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key):
|
||||
data = {'service': sample_api_key.service,
|
||||
'name': 'some key name',
|
||||
@@ -264,35 +305,35 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_
|
||||
|
||||
|
||||
def test_authentication_returns_error_when_admin_client_has_no_secrets(client):
|
||||
api_secret = current_app.config.get('ADMIN_CLIENT_SECRET')
|
||||
api_secret = current_app.config.get('API_INTERNAL_SECRETS')[0]
|
||||
api_service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
token = create_jwt_token(
|
||||
secret=api_secret,
|
||||
client_id=api_service_id
|
||||
)
|
||||
with set_config(client.application, 'ADMIN_CLIENT_SECRET', ''):
|
||||
with set_config(client.application, 'API_INTERNAL_SECRETS', []):
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 403
|
||||
assert response.status_code == 401
|
||||
error_message = json.loads(response.get_data())
|
||||
assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]}
|
||||
assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]}
|
||||
|
||||
|
||||
def test_authentication_returns_error_when_admin_client_secret_is_invalid(client):
|
||||
api_secret = current_app.config.get('ADMIN_CLIENT_SECRET')
|
||||
api_secret = current_app.config.get('API_INTERNAL_SECRETS')[0]
|
||||
token = create_jwt_token(
|
||||
secret=api_secret,
|
||||
client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME')
|
||||
)
|
||||
current_app.config['ADMIN_CLIENT_SECRET'] = 'something-wrong'
|
||||
current_app.config['API_INTERNAL_SECRETS'][0] = 'something-wrong'
|
||||
response = client.get(
|
||||
'/service',
|
||||
headers={'Authorization': 'Bearer {}'.format(token)})
|
||||
assert response.status_code == 403
|
||||
assert response.status_code == 401
|
||||
error_message = json.loads(response.get_data())
|
||||
assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]}
|
||||
current_app.config['ADMIN_CLIENT_SECRET'] = api_secret
|
||||
assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]}
|
||||
current_app.config['API_INTERNAL_SECRETS'][0] = api_secret
|
||||
|
||||
|
||||
def test_authentication_returns_error_when_service_doesnt_exit(
|
||||
@@ -397,7 +438,9 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu
|
||||
(False, 'wrong_key', 200),
|
||||
])
|
||||
def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status):
|
||||
token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRET'], current_app.config['ADMIN_CLIENT_USER_NAME'])
|
||||
token = create_jwt_token(
|
||||
current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']
|
||||
)
|
||||
|
||||
with set_config_values(notify_api, {
|
||||
'ROUTE_SECRET_KEY_1': 'key_1',
|
||||
|
||||
Reference in New Issue
Block a user