diff --git a/app/authentication/auth.py b/app/authentication/auth.py index c1b262b1c..4f85936f6 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -62,11 +62,21 @@ def requires_admin_auth(): if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'): g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - secret = "" if len(current_app.config.get('ADMIN_CLIENT_SECRETS')): - secret = current_app.config.get('ADMIN_CLIENT_SECRETS')[0] + for secret in current_app.config.get('ADMIN_CLIENT_SECRETS'): + try: + decode_jwt_token(auth_token, secret) + return + except TokenExpiredError: + raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) + except TokenDecodeError: + # TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions + # (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather + # than continue on to check the next API key + continue - return handle_admin_key(auth_token, secret) + # Either there are no admin client secrets or their token didn't match one of them so error + raise AuthError("Unauthorized: admin authentication token not found", 401) else: raise AuthError('Unauthorized: admin authentication token required', 401) @@ -135,12 +145,3 @@ def __get_token_issuer(auth_token): except TokenDecodeError: raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403) return issuer - - -def handle_admin_key(auth_token, secret): - try: - decode_jwt_token(auth_token, secret) - except TokenExpiredError: - raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) - except TokenDecodeError: - raise AuthError("Invalid token: could not decode your API token", 403) diff --git a/app/config.py b/app/config.py index 4a716d037..a86cdd401 100644 --- a/app/config.py +++ b/app/config.py @@ -64,8 +64,8 @@ class Config(object): # URL of api app (on AWS this is the internal api endpoint) API_HOST_NAME = os.getenv('API_HOST_NAME') - # admin app api key - ADMIN_CLIENT_SECRETS = [os.getenv('ADMIN_CLIENT_SECRET')] if os.getenv('ADMIN_CLIENT_SECRET') else [] + # admin app api keys + ADMIN_CLIENT_SECRETS = json.loads(os.environ.get('ADMIN_CLIENT_SECRETS', '[]')) # encyption secret/salt SECRET_KEY = os.getenv('SECRET_KEY') diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 252c42739..9bc6ed1ab 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -123,7 +123,30 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client, sample_api_key) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: requires_admin_auth() - assert exc.value.short_message == "Invalid token: could not decode your API token" + assert exc.value.short_message == "Unauthorized: admin authentication token not found" + + +def test_admin_auth_should_not_allow_request_with_old_iat(client): + iss = current_app.config['ADMIN_CLIENT_USER_NAME'] + secret = current_app.config['ADMIN_CLIENT_SECRETS'][0] + + # code copied from notifications_python_client.authentication.py::create_jwt_token + headers = { + "typ": 'JWT', + "alg": 'HS256' + } + + claims = { + 'iss': iss, + 'iat': int(time.time()) - 60 + } + + token = jwt.encode(payload=claims, key=secret, headers=headers).decode() + + request.headers = {'Authorization': 'Bearer {}'.format(token)} + with pytest.raises(AuthError) as exc: + requires_admin_auth() + assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate" def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): @@ -188,11 +211,28 @@ def test_should_allow_valid_token_for_request_with_path_params_for_public_url(cl def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client): - token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']) + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 +def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_with_second_secret(client): + with set_config(client.application, 'ADMIN_CLIENT_SECRETS', ["secret1", "secret2"]): + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][1], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): data = {'service': sample_api_key.service, 'name': 'some key name', @@ -274,9 +314,9 @@ def test_authentication_returns_error_when_admin_client_has_no_secrets(client): response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]} + assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} def test_authentication_returns_error_when_admin_client_secret_is_invalid(client): @@ -289,9 +329,9 @@ def test_authentication_returns_error_when_admin_client_secret_is_invalid(client response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: could not decode your API token"]} + assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} current_app.config['ADMIN_CLIENT_SECRETS'][0] = api_secret @@ -397,7 +437,9 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu (False, 'wrong_key', 200), ]) def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status): - token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME']) + token = create_jwt_token( + current_app.config['ADMIN_CLIENT_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] + ) with set_config_values(notify_api, { 'ROUTE_SECRET_KEY_1': 'key_1',