From 724630644732190bbeb692620fc41e1841a471d8 Mon Sep 17 00:00:00 2001 From: David McDonald Date: Wed, 19 Feb 2020 17:50:00 +0000 Subject: [PATCH] Support multiple secrets for ADMIN_CLIENT_SECRETS This will allow us to accept two different ones and therefore allow us to rotate the secret that the admin client is sending to the API Due to how the notifications-python-client throws exceptions, we run into exactly the same issue with not being able to distinguish if a `TokenDecodeError` is thrown because the token was encrypted with a different secret key or if because there was a different error when decoding. I've copied the TODO from `requires_auth` as this is exactly the same issue. I've also added a test case for functionality that was missing for an out of date admin token (old IAT). --- app/authentication/auth.py | 25 +++++---- app/config.py | 4 +- .../app/authentication/test_authentication.py | 56 ++++++++++++++++--- 3 files changed, 64 insertions(+), 21 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index c1b262b1c..4f85936f6 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -62,11 +62,21 @@ def requires_admin_auth(): if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'): g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - secret = "" if len(current_app.config.get('ADMIN_CLIENT_SECRETS')): - secret = current_app.config.get('ADMIN_CLIENT_SECRETS')[0] + for secret in current_app.config.get('ADMIN_CLIENT_SECRETS'): + try: + decode_jwt_token(auth_token, secret) + return + except TokenExpiredError: + raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) + except TokenDecodeError: + # TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions + # (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather + # than continue on to check the next API key + continue - return handle_admin_key(auth_token, secret) + # Either there are no admin client secrets or their token didn't match one of them so error + raise AuthError("Unauthorized: admin authentication token not found", 401) else: raise AuthError('Unauthorized: admin authentication token required', 401) @@ -135,12 +145,3 @@ def __get_token_issuer(auth_token): except TokenDecodeError: raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403) return issuer - - -def handle_admin_key(auth_token, secret): - try: - decode_jwt_token(auth_token, secret) - except TokenExpiredError: - raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) - except TokenDecodeError: - raise AuthError("Invalid token: could not decode your API token", 403) diff --git a/app/config.py b/app/config.py index 4a716d037..a86cdd401 100644 --- a/app/config.py +++ b/app/config.py @@ -64,8 +64,8 @@ class Config(object): # URL of api app (on AWS this is the internal api endpoint) API_HOST_NAME = os.getenv('API_HOST_NAME') - # admin app api key - ADMIN_CLIENT_SECRETS = [os.getenv('ADMIN_CLIENT_SECRET')] if os.getenv('ADMIN_CLIENT_SECRET') else [] + # admin app api keys + ADMIN_CLIENT_SECRETS = json.loads(os.environ.get('ADMIN_CLIENT_SECRETS', '[]')) # encyption secret/salt SECRET_KEY = os.getenv('SECRET_KEY') diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 252c42739..9bc6ed1ab 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -123,7 +123,30 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: requires_admin_auth() - assert exc.value.short_message == "Invalid token: could not decode your API token" + assert exc.value.short_message == "Unauthorized: admin authentication token not found" + + +def test_admin_auth_should_not_allow_request_with_old_iat(client): + iss = current_app.config['ADMIN_CLIENT_USER_NAME'] + secret = current_app.config['ADMIN_CLIENT_SECRETS'][0] + + # code copied from notifications_python_client.authentication.py::create_jwt_token + headers = { + "typ": 'JWT', + "alg": 'HS256' + } + + claims = { + 'iss': iss, + 'iat': int(time.time()) - 60 + } + + token = jwt.encode(payload=claims, key=secret, headers=headers).decode() + + request.headers = {'Authorization': 'Bearer {}'.format(token)} + with pytest.raises(AuthError) as exc: + requires_admin_auth() + assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate" def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): @@ -188,11 +211,28 @@ def test_should_allow_valid_token_for_request_with_path_params_for_public_url(cl def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client): - token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']) + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 +def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_with_second_secret(client): + with set_config(client.application, 'ADMIN_CLIENT_SECRETS', ["secret1", "secret2"]): + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][1], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): data = {'service': sample_api_key.service, 'name': 'some key name', @@ -274,9 +314,9 @@ def test_authentication_returns_error_when_admin_client_has_no_secrets(client): response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]} + assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} def test_authentication_returns_error_when_admin_client_secret_is_invalid(client): @@ -289,9 +329,9 @@ def test_authentication_returns_error_when_admin_client_secret_is_invalid(client response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]} + assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} current_app.config['ADMIN_CLIENT_SECRETS'][0] = api_secret @@ -397,7 +437,9 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu (False, 'wrong_key', 200), ]) def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status): - token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']) + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) with set_config_values(notify_api, { 'ROUTE_SECRET_KEY_1': 'key_1',