From 49455d9890b94ee7f1ad2370f5ec8acc3f88a6ca Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Mon, 26 Jul 2021 16:45:10 +0100 Subject: [PATCH 01/16] Support granular API auth for internal apps Previously we just had a single array of API keys / secrets, any of which could be used to get past the "requires_admin_auth" check. While multiple keys are necessary to allow for rotation, we should avoid giving other apps access this way (too much privilege). This converts the existing config vars into a new dictionary, keyed by client_id. We can then use the dictionary to scope auth for new API consumers like gov.uk/alerts to just the endpoints they need to access, while maintaining existing access for the Admin app. Once the new dictionary is available as a JSON environment variable, we'll be able to remove the old credentials / config. In the next commits, we'll look at more tests for the new functionality. --- app/authentication/auth.py | 45 ++++++----- app/config.py | 15 +++- tests/__init__.py | 2 +- .../app/authentication/test_authentication.py | 78 +++++++++---------- 4 files changed, 78 insertions(+), 62 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index 9db851e47..4ca1e5c88 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -67,30 +67,37 @@ def requires_no_auth(): def requires_admin_auth(): + requires_internal_auth(current_app.config.get('ADMIN_CLIENT_USER_NAME')) + + +def requires_internal_auth(expected_client_id): + if expected_client_id not in current_app.config.get('INTERNAL_CLIENT_API_KEYS'): + raise TypeError("Unknown client_id for internal auth") + request_helper.check_proxy_header_before_request() - auth_token = get_auth_token(request) - client = __get_token_issuer(auth_token) + client_id = __get_token_issuer(auth_token) - if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'): - g.service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') + if client_id != expected_client_id: + raise AuthError("Unauthorized: not allowed to perform this action", 401) - for secret in current_app.config.get('API_INTERNAL_SECRETS'): - try: - decode_jwt_token(auth_token, secret) - return - except TokenExpiredError: - raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) - except TokenDecodeError: - # TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions - # (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather - # than continue on to check the next admin client secret - continue + g.service_id = client_id + secrets = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id] - # Either there are no admin client secrets or their token didn't match one of them so error - raise AuthError("Unauthorized: admin authentication token not found", 401) - else: - raise AuthError('Unauthorized: admin authentication token required', 401) + for secret in secrets: + try: + decode_jwt_token(auth_token, secret) + return + except TokenExpiredError: + raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) + except TokenDecodeError: + # TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions + # (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather + # than continue on to check the next admin client secret + continue + + # Either there are no admin client secrets or their token didn't match one of them so error + raise AuthError("Unauthorized: API authentication token not found", 401) def requires_auth(): diff --git a/app/config.py b/app/config.py index 973eb3e28..74d679001 100644 --- a/app/config.py +++ b/app/config.py @@ -84,9 +84,16 @@ class Config(object): # URL of api app (on AWS this is the internal api endpoint) API_HOST_NAME = os.getenv('API_HOST_NAME') - # secrets that internal apps, such as the admin app or document download, must use to authenticate with the API + # LEGACY: replacing with INTERNAL_CLIENT_API_KEYS API_INTERNAL_SECRETS = json.loads(os.environ.get('API_INTERNAL_SECRETS', '[]')) + # secrets that internal apps, such as the admin app or document download, must use to authenticate with the API + ADMIN_CLIENT_USER_NAME = 'notify-admin' + + INTERNAL_CLIENT_API_KEYS = { + ADMIN_CLIENT_USER_NAME: API_INTERNAL_SECRETS + } + # encyption secret/salt SECRET_KEY = os.getenv('SECRET_KEY') DANGEROUS_SALT = os.getenv('DANGEROUS_SALT') @@ -129,7 +136,6 @@ class Config(object): ########################### NOTIFY_ENVIRONMENT = 'development' - ADMIN_CLIENT_USER_NAME = 'notify-admin' AWS_REGION = 'eu-west-1' INVITATION_EXPIRATION_DAYS = 2 NOTIFY_APP_NAME = 'api' @@ -399,7 +405,10 @@ class Development(Config): TRANSIENT_UPLOADED_LETTERS = 'development-transient-uploaded-letters' LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise' - API_INTERNAL_SECRETS = ['dev-notify-secret-key'] + INTERNAL_CLIENT_API_KEYS = { + Config.ADMIN_CLIENT_USER_NAME: ['dev-notify-secret-key'] + } + SECRET_KEY = 'dev-notify-secret-key' DANGEROUS_SALT = 'dev-notify-salt' diff --git a/tests/__init__.py b/tests/__init__.py index 42f5b3a6d..c276a0170 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -28,7 +28,7 @@ def create_authorization_header(service_id=None, key_type=KEY_TYPE_NORMAL): else: client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['API_INTERNAL_SECRETS'][0] + secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] token = create_jwt_token(secret=secret, client_id=client_id) return 'Authorization', 'Bearer {}'.format(token) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index e675fccf3..c9f32157f 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -116,8 +116,8 @@ def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_a def test_admin_auth_should_not_allow_request_with_no_iat(client): - iss = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['API_INTERNAL_SECRETS'][0] + client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] # code copied from notifications_python_client.authentication.py::create_jwt_token headers = { @@ -126,7 +126,7 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client): } claims = { - 'iss': iss + 'iss': client_id, # 'iat': not provided } @@ -135,12 +135,12 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client): request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: requires_admin_auth() - assert exc.value.short_message == "Unauthorized: admin authentication token not found" + assert exc.value.short_message == "Unauthorized: API authentication token not found" def test_admin_auth_should_not_allow_request_with_old_iat(client): - iss = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['API_INTERNAL_SECRETS'][0] + client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] # code copied from notifications_python_client.authentication.py::create_jwt_token headers = { @@ -149,7 +149,7 @@ def test_admin_auth_should_not_allow_request_with_old_iat(client): } claims = { - 'iss': iss, + 'iss': client_id, 'iat': int(time.time()) - 60 } @@ -224,24 +224,24 @@ def test_should_allow_valid_token_for_request_with_path_params_for_public_url(cl def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client): - token = create_jwt_token( - current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] - ) + client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] + + token = create_jwt_token(secret, client_id) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_with_second_secret(client): - with set_config(client.application, 'API_INTERNAL_SECRETS', ["secret1", "secret2"]): - token = create_jwt_token( - current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] - ) + client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + new_secrets = {client_id: ["secret1", "secret2"]} + + with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): + token = create_jwt_token("secret1", client_id) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 - token = create_jwt_token( - current_app.config['API_INTERNAL_SECRETS'][1], current_app.config['ADMIN_CLIENT_USER_NAME'] - ) + token = create_jwt_token("secret2", client_id) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 @@ -317,35 +317,35 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_ def test_authentication_returns_error_when_admin_client_has_no_secrets(client): - api_secret = current_app.config.get('API_INTERNAL_SECRETS')[0] - api_service_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - token = create_jwt_token( - secret=api_secret, - client_id=api_service_id - ) - with set_config(client.application, 'API_INTERNAL_SECRETS', []): + client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') + secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] + token = create_jwt_token(secret, client_id) + new_secrets = {client_id: []} + + with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} + assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} def test_authentication_returns_error_when_admin_client_secret_is_invalid(client): - api_secret = current_app.config.get('API_INTERNAL_SECRETS')[0] - token = create_jwt_token( - secret=api_secret, - client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME') - ) - current_app.config['API_INTERNAL_SECRETS'][0] = 'something-wrong' - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) + client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') + secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] + token = create_jwt_token(secret, client_id) + new_secrets = {client_id: ['something-wrong']} + + with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): + response = client.get( + '/service', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 401 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Unauthorized: admin authentication token not found"]} - current_app.config['API_INTERNAL_SECRETS'][0] = api_secret + assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} def test_authentication_returns_error_when_service_doesnt_exit( @@ -450,9 +450,9 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu (False, 'wrong_key', 200), ]) def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status): - token = create_jwt_token( - current_app.config['API_INTERNAL_SECRETS'][0], current_app.config['ADMIN_CLIENT_USER_NAME'] - ) + client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') + secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] + token = create_jwt_token(secret, client_id) with set_config_values(notify_api, { 'ROUTE_SECRET_KEY_1': 'key_1', From 2788682b0abc911d821752723ebf780a00a13b56 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 27 Jul 2021 15:07:00 +0100 Subject: [PATCH 02/16] DRY-up creating JWT tokens manually in auth tests This makes it a bit easier to see what tests are missing. --- .../app/authentication/test_authentication.py | 109 ++++++------------ 1 file changed, 33 insertions(+), 76 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index c9f32157f..2282e3009 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -28,6 +28,12 @@ from app.models import KEY_TYPE_NORMAL, ApiKey from tests.conftest import set_config, set_config_values +def create_custom_jwt_token(headers=None, payload=None, key=None): + # code copied from notifications_python_client.authentication.py::create_jwt_token + headers = headers or {"typ": 'JWT', "alg": 'HS256'} + return jwt.encode(payload=payload, key=key or str(uuid.uuid4()), headers=headers) + + @pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) def test_should_not_allow_request_with_no_token(client, auth_fn): request.headers = {} @@ -54,18 +60,9 @@ def test_should_not_allow_request_with_incorrect_token(client, auth_fn): @pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) def test_should_not_allow_request_with_no_iss(client, auth_fn): - # code copied from notifications_python_client.authentication.py::create_jwt_token - headers = { - "typ": 'JWT', - "alg": 'HS256' - } - - claims = { - # 'iss': not provided - 'iat': int(time.time()) - } - - token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers) + token = create_custom_jwt_token( + payload={'iat': int(time.time())} + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -73,20 +70,10 @@ def test_should_not_allow_request_with_no_iss(client, auth_fn): assert exc.value.short_message == 'Invalid token: iss field not provided' -def test_auth_should_not_allow_request_with_no_iat(client, sample_api_key): - iss = str(sample_api_key.service_id) - # code copied from notifications_python_client.authentication.py::create_jwt_token - headers = { - "typ": 'JWT', - "alg": 'HS256' - } - - claims = { - 'iss': iss - # 'iat': not provided - } - - token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers) +def test_should_not_allow_request_with_no_iat(client, sample_api_key): + token = create_custom_jwt_token( + payload={'iss': str(sample_api_key.service_id)} + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -95,19 +82,10 @@ def test_auth_should_not_allow_request_with_no_iat(client, sample_api_key): def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_api_key): - iss = str(sample_api_key.service_id) - # code copied from notifications_python_client.authentication.py::create_jwt_token - headers = { - "typ": 'JWT', - "alg": 'HS512' - } - - claims = { - 'iss': iss, - 'iat': int(time.time()) - } - - token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers) + token = create_custom_jwt_token( + headers={"typ": 'JWT', "alg": 'HS512'}, + payload={'iss': str(sample_api_key.service_id), 'iat': int(time.time())} + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -119,18 +97,10 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] - # code copied from notifications_python_client.authentication.py::create_jwt_token - headers = { - "typ": 'JWT', - "alg": 'HS256' - } - - claims = { - 'iss': client_id, - # 'iat': not provided - } - - token = jwt.encode(payload=claims, key=secret, headers=headers) + token = create_custom_jwt_token( + payload={'iss': client_id}, + key=secret + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -142,18 +112,10 @@ def test_admin_auth_should_not_allow_request_with_old_iat(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] - # code copied from notifications_python_client.authentication.py::create_jwt_token - headers = { - "typ": 'JWT', - "alg": 'HS256' - } - - claims = { - 'iss': client_id, - 'iat': int(time.time()) - 60 - } - - token = jwt.encode(payload=claims, key=secret, headers=headers) + token = create_custom_jwt_token( + payload={'iss': client_id, 'iat': int(time.time()) - 60}, + key=secret + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -162,21 +124,16 @@ def test_admin_auth_should_not_allow_request_with_old_iat(client): def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): - iss = str(sample_api_key.service_id) key = get_unsigned_secrets(sample_api_key.service_id)[0] - headers = { - "typ": 'JWT', - "alg": 'HS256' - } - - claims = { - 'iss': iss, - 'iat': int(time.time()), - 'aud': 'notifications.service.gov.uk' # extra claim that we don't support - } - - token = jwt.encode(payload=claims, key=key, headers=headers) + token = create_custom_jwt_token( + payload={ + 'iss': str(sample_api_key.service_id), + 'iat': int(time.time()), + 'aud': 'notifications.service.gov.uk' # extra claim that we don't support + }, + key=key + ) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: From 36afd4210f47e28fcd88d54bb1ebb5b5c1a96dd3 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 27 Jul 2021 15:19:46 +0100 Subject: [PATCH 03/16] Rename tests to match the function under test This makes it easier to see what we're missing. I also tweaked one test where a parameter wasn't necessary. --- .../app/authentication/test_authentication.py | 69 ++++++++++--------- 1 file changed, 35 insertions(+), 34 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 2282e3009..33048e249 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -70,7 +70,7 @@ def test_should_not_allow_request_with_no_iss(client, auth_fn): assert exc.value.short_message == 'Invalid token: iss field not provided' -def test_should_not_allow_request_with_no_iat(client, sample_api_key): +def test_requires_auth_should_not_allow_request_with_no_iat(client, sample_api_key): token = create_custom_jwt_token( payload={'iss': str(sample_api_key.service_id)} ) @@ -81,7 +81,7 @@ def test_should_not_allow_request_with_no_iat(client, sample_api_key): assert exc.value.short_message == 'Invalid token: API key not found' -def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_api_key): +def test_requires_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_api_key): token = create_custom_jwt_token( headers={"typ": 'JWT', "alg": 'HS512'}, payload={'iss': str(sample_api_key.service_id), 'iat': int(time.time())} @@ -93,7 +93,7 @@ def test_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_a assert exc.value.short_message == 'Invalid token: algorithm used is not HS256' -def test_admin_auth_should_not_allow_request_with_no_iat(client): +def test_requires_admin_auth_should_not_allow_request_with_no_iat(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] @@ -108,7 +108,7 @@ def test_admin_auth_should_not_allow_request_with_no_iat(client): assert exc.value.short_message == "Unauthorized: API authentication token not found" -def test_admin_auth_should_not_allow_request_with_old_iat(client): +def test_requires_admin_auth_should_not_allow_request_with_old_iat(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] @@ -123,7 +123,7 @@ def test_admin_auth_should_not_allow_request_with_old_iat(client): assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate" -def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): +def test_requires_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): key = get_unsigned_secrets(sample_api_key.service_id)[0] token = create_custom_jwt_token( @@ -141,7 +141,7 @@ def test_auth_should_not_allow_request_with_extra_claims(client, sample_api_key) assert exc.value.short_message == GENERAL_TOKEN_ERROR_MESSAGE -def test_should_not_allow_invalid_secret(client, sample_api_key): +def test_requires_auth_should_not_allow_invalid_secret(client, sample_api_key): token = create_jwt_token( secret="not-so-secret", client_id=str(sample_api_key.service_id)) @@ -155,14 +155,14 @@ def test_should_not_allow_invalid_secret(client, sample_api_key): @pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) -def test_should_allow_valid_token(client, sample_api_key, scheme): +def test_requires_auth_should_allow_valid_token(client, sample_api_key, scheme): token = __create_token(sample_api_key.service_id) response = client.get('/notifications', headers={'Authorization': '{} {}'.format(scheme, token)}) assert response.status_code == 200 @pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234]) -def test_should_not_allow_service_id_that_is_not_the_wrong_data_type(client, sample_api_key, service_id): +def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type(client, sample_api_key, service_id): token = create_jwt_token(secret=get_unsigned_secrets(sample_api_key.service_id)[0], client_id=service_id) response = client.get( @@ -174,13 +174,13 @@ def test_should_not_allow_service_id_that_is_not_the_wrong_data_type(client, sam assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} -def test_should_allow_valid_token_for_request_with_path_params_for_public_url(client, sample_api_key): +def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url(client, sample_api_key): token = __create_token(sample_api_key.service_id) response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 -def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client): +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] @@ -189,7 +189,7 @@ def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(cli assert response.status_code == 200 -def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_with_second_secret(client): +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params_with_second_secret(client): client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] new_secrets = {client_id: ["secret1", "secret2"]} @@ -203,7 +203,7 @@ def test_should_allow_valid_token_for_request_with_path_params_for_admin_url_wit assert response.status_code == 200 -def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): +def test_requires_auth_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): data = {'service': sample_api_key.service, 'name': 'some key name', 'created_by': sample_api_key.created_by, @@ -218,7 +218,7 @@ def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_ assert response.status_code == 200 -def test_authentication_passes_when_service_has_multiple_keys_some_expired( +def test_requires_auth_passes_when_service_has_multiple_keys_some_expired( client, sample_api_key): expired_key_data = {'service': sample_api_key.service, @@ -245,8 +245,9 @@ def test_authentication_passes_when_service_has_multiple_keys_some_expired( assert response.status_code == 200 -def test_authentication_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys(client, - sample_api_key): +def test_requires_auth_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys( + client, sample_api_key +): expired_key = {'service': sample_api_key.service, 'name': 'expired_key', 'created_by': sample_api_key.created_by, @@ -273,7 +274,7 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_ assert exc.value.api_key_id == expired_api_key.id -def test_authentication_returns_error_when_admin_client_has_no_secrets(client): +def test_requires_admin_auth_returns_error_with_no_secrets(client): client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] token = create_jwt_token(secret, client_id) @@ -289,7 +290,7 @@ def test_authentication_returns_error_when_admin_client_has_no_secrets(client): assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} -def test_authentication_returns_error_when_admin_client_secret_is_invalid(client): +def test_requires_admin_auth_returns_error_when_secret_is_invalid(client): client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] token = create_jwt_token(secret, client_id) @@ -305,7 +306,7 @@ def test_authentication_returns_error_when_admin_client_secret_is_invalid(client assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} -def test_authentication_returns_error_when_service_doesnt_exit( +def test_requires_auth_returns_error_when_service_doesnt_exit( client, sample_api_key ): @@ -323,7 +324,7 @@ def test_authentication_returns_error_when_service_doesnt_exit( assert error_message['message'] == {'token': ['Invalid token: service not found']} -def test_authentication_returns_error_when_service_inactive(client, sample_api_key): +def test_requires_auth_returns_error_when_service_inactive(client, sample_api_key): sample_api_key.service.active = False token = create_jwt_token(secret=str(sample_api_key.id), client_id=str(sample_api_key.service_id)) @@ -334,9 +335,9 @@ def test_authentication_returns_error_when_service_inactive(client, sample_api_k assert error_message['message'] == {'token': ['Invalid token: service is archived']} -def test_authentication_returns_error_when_service_has_no_secrets(client, - sample_service, - fake_uuid): +def test_requires_auth_returns_error_when_service_has_no_secrets( + client, sample_service, fake_uuid +): token = create_jwt_token( secret=fake_uuid, client_id=str(sample_service.id)) @@ -359,8 +360,9 @@ def test_should_attach_the_current_api_key_to_current_app(notify_api, sample_ser assert str(api_user.id) == str(sample_api_key.id) -def test_should_return_403_when_token_is_expired(client, - sample_api_key): +def test_requires_auth_return_403_when_token_is_expired( + client, sample_api_key +): with freeze_time('2001-01-01T12:00:00'): token = __create_token(sample_api_key.service_id) with freeze_time('2001-01-01T12:00:40'): @@ -377,13 +379,13 @@ def __create_token(service_id): client_id=str(service_id)) -@pytest.mark.parametrize('check_proxy_header,header_value,expected_status', [ - (True, 'key_1', 200), - (True, 'wrong_key', 200), - (False, 'key_1', 200), - (False, 'wrong_key', 200), +@pytest.mark.parametrize('check_proxy_header,header_value', [ + (True, 'key_1'), + (True, 'wrong_key'), + (False, 'key_1'), + (False, 'wrong_key'), ]) -def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status): +def test_requires_no_auth_proxy_key(notify_api, check_proxy_header, header_value): with set_config_values(notify_api, { 'ROUTE_SECRET_KEY_1': 'key_1', 'ROUTE_SECRET_KEY_2': '', @@ -397,7 +399,7 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu ('X-Custom-Forwarder', header_value), ] ) - assert response.status_code == expected_status + assert response.status_code == 200 @pytest.mark.parametrize('check_proxy_header,header_value,expected_status', [ @@ -406,7 +408,7 @@ def test_proxy_key_non_auth_endpoint(notify_api, check_proxy_header, header_valu (False, 'key_1', 200), (False, 'wrong_key', 200), ]) -def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header_value, expected_status): +def test_requires_admin_auth_proxy_key(notify_api, check_proxy_header, header_value, expected_status): client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] token = create_jwt_token(secret, client_id) @@ -428,8 +430,7 @@ def test_proxy_key_on_admin_auth_endpoint(notify_api, check_proxy_header, header assert response.status_code == expected_status -def test_should_cache_service_and_api_key_lookups(mocker, client, sample_api_key): - +def test_requires_auth_should_cache_service_and_api_key_lookups(mocker, client, sample_api_key): mock_get_api_keys = mocker.patch( 'app.serialised_models.get_model_api_keys', wraps=get_model_api_keys, From e08d726f0579e023c565f5cf42dff5dc95e41a0d Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 27 Jul 2021 17:25:22 +0100 Subject: [PATCH 04/16] DRY-up and simplify creating JWT tokens in tests Previously this was heavily duplicated but with the odd test using a __create_token method. This adds some fixtures to remove all the boilerplate and standardise how tokens are created in each test. --- .../app/authentication/test_authentication.py | 232 ++++++++++++------ 1 file changed, 151 insertions(+), 81 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 33048e249..43b5c4f02 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -34,6 +34,34 @@ def create_custom_jwt_token(headers=None, payload=None, key=None): return jwt.encode(payload=payload, key=key or str(uuid.uuid4()), headers=headers) +@pytest.fixture +def service_jwt_secret(sample_api_key): + return get_unsigned_secrets(sample_api_key.service_id)[0] + + +@pytest.fixture +def service_jwt_token(sample_api_key, service_jwt_secret): + return create_jwt_token( + secret=service_jwt_secret, + client_id=str(sample_api_key.service_id), + ) + + +@pytest.fixture +def admin_jwt_client_id(): + return current_app.config['ADMIN_CLIENT_USER_NAME'] + + +@pytest.fixture +def admin_jwt_secret(admin_jwt_client_id): + return current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] + + +@pytest.fixture +def admin_jwt_token(admin_jwt_client_id, admin_jwt_secret): + return create_jwt_token(admin_jwt_secret, admin_jwt_client_id) + + @pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) def test_should_not_allow_request_with_no_token(client, auth_fn): request.headers = {} @@ -93,13 +121,14 @@ def test_requires_auth_should_not_allow_request_with_non_hs256_algorithm(client, assert exc.value.short_message == 'Invalid token: algorithm used is not HS256' -def test_requires_admin_auth_should_not_allow_request_with_no_iat(client): - client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] - +def test_requires_admin_auth_should_not_allow_request_with_no_iat( + client, + admin_jwt_client_id, + admin_jwt_secret, +): token = create_custom_jwt_token( - payload={'iss': client_id}, - key=secret + payload={'iss': admin_jwt_client_id}, + key=admin_jwt_secret ) request.headers = {'Authorization': 'Bearer {}'.format(token)} @@ -108,13 +137,14 @@ def test_requires_admin_auth_should_not_allow_request_with_no_iat(client): assert exc.value.short_message == "Unauthorized: API authentication token not found" -def test_requires_admin_auth_should_not_allow_request_with_old_iat(client): - client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] - +def test_requires_admin_auth_should_not_allow_request_with_old_iat( + client, + admin_jwt_client_id, + admin_jwt_secret, +): token = create_custom_jwt_token( - payload={'iss': client_id, 'iat': int(time.time()) - 60}, - key=secret + payload={'iss': admin_jwt_client_id, 'iat': int(time.time()) - 60}, + key=admin_jwt_secret ) request.headers = {'Authorization': 'Bearer {}'.format(token)} @@ -123,16 +153,18 @@ def test_requires_admin_auth_should_not_allow_request_with_old_iat(client): assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate" -def test_requires_auth_should_not_allow_request_with_extra_claims(client, sample_api_key): - key = get_unsigned_secrets(sample_api_key.service_id)[0] - +def test_requires_auth_should_not_allow_request_with_extra_claims( + client, + sample_api_key, + service_jwt_secret, +): token = create_custom_jwt_token( payload={ 'iss': str(sample_api_key.service_id), 'iat': int(time.time()), 'aud': 'notifications.service.gov.uk' # extra claim that we don't support }, - key=key + key=service_jwt_secret, ) request.headers = {'Authorization': 'Bearer {}'.format(token)} @@ -155,16 +187,30 @@ def test_requires_auth_should_not_allow_invalid_secret(client, sample_api_key): @pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) -def test_requires_auth_should_allow_valid_token(client, sample_api_key, scheme): - token = __create_token(sample_api_key.service_id) +def test_requires_auth_should_allow_valid_token( + client, + sample_api_key, + service_jwt_secret, + scheme, +): + token = create_jwt_token( + client_id=str(sample_api_key.service_id), + secret=service_jwt_secret, + ) response = client.get('/notifications', headers={'Authorization': '{} {}'.format(scheme, token)}) assert response.status_code == 200 @pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234]) -def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type(client, sample_api_key, service_id): - token = create_jwt_token(secret=get_unsigned_secrets(sample_api_key.service_id)[0], - client_id=service_id) +def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type( + client, + service_jwt_secret, + service_id +): + token = create_jwt_token( + client_id=service_id, + secret=service_jwt_secret, + ) response = client.get( '/notifications', headers={'Authorization': "Bearer {}".format(token)} @@ -174,36 +220,43 @@ def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type(clie assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} -def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url(client, sample_api_key): - token = __create_token(sample_api_key.service_id) - response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) +def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url( + client, + service_jwt_token, +): + response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) assert response.status_code == 200 -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params(client): - client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] - - token = create_jwt_token(secret, client_id) - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params( + client, + admin_jwt_token +): + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) assert response.status_code == 200 -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params_with_second_secret(client): - client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - new_secrets = {client_id: ["secret1", "secret2"]} +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params_with_second_secret( + client, + admin_jwt_client_id, +): + new_secrets = {admin_jwt_client_id: ["secret1", "secret2"]} with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): - token = create_jwt_token("secret1", client_id) + token = create_jwt_token("secret1", admin_jwt_client_id) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 - token = create_jwt_token("secret2", client_id) + token = create_jwt_token("secret2", admin_jwt_client_id) response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 200 -def test_requires_auth_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): +def test_requires_auth_should_allow_valid_token_when_service_has_multiple_keys( + client, + sample_api_key, + service_jwt_token, +): data = {'service': sample_api_key.service, 'name': 'some key name', 'created_by': sample_api_key.created_by, @@ -211,16 +264,16 @@ def test_requires_auth_should_allow_valid_token_when_service_has_multiple_keys(c } api_key = ApiKey(**data) save_model_api_key(api_key) - token = __create_token(sample_api_key.service_id) response = client.get( '/notifications', - headers={'Authorization': 'Bearer {}'.format(token)}) + headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) assert response.status_code == 200 def test_requires_auth_passes_when_service_has_multiple_keys_some_expired( - client, - sample_api_key): + client, + sample_api_key, +): expired_key_data = {'service': sample_api_key.service, 'name': 'expired_key', 'expiry_date': datetime.utcnow(), @@ -237,8 +290,9 @@ def test_requires_auth_passes_when_service_has_multiple_keys_some_expired( api_key = ApiKey(**another_key) save_model_api_key(api_key) token = create_jwt_token( - secret=get_unsigned_secret(api_key.id), - client_id=str(sample_api_key.service_id)) + client_id=str(sample_api_key.service_id), + secret=get_unsigned_secret(api_key.id) + ) response = client.get( '/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) @@ -246,7 +300,8 @@ def test_requires_auth_passes_when_service_has_multiple_keys_some_expired( def test_requires_auth_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys( - client, sample_api_key + client, + sample_api_key ): expired_key = {'service': sample_api_key.service, 'name': 'expired_key', @@ -263,8 +318,9 @@ def test_requires_auth_returns_token_expired_when_service_uses_expired_key_and_h api_key = ApiKey(**another_key) save_model_api_key(api_key) token = create_jwt_token( - secret=get_unsigned_secret(expired_api_key.id), - client_id=str(sample_api_key.service_id)) + client_id=str(sample_api_key.service_id), + secret=get_unsigned_secret(expired_api_key.id) + ) expire_api_key(service_id=sample_api_key.service_id, api_key_id=expired_api_key.id) request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: @@ -274,39 +330,41 @@ def test_requires_auth_returns_token_expired_when_service_uses_expired_key_and_h assert exc.value.api_key_id == expired_api_key.id -def test_requires_admin_auth_returns_error_with_no_secrets(client): - client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] - token = create_jwt_token(secret, client_id) - new_secrets = {client_id: []} +def test_requires_admin_auth_returns_error_with_no_secrets( + client, + admin_jwt_client_id, + admin_jwt_token, +): + new_secrets = {admin_jwt_client_id: []} with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): response = client.get( '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) + headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) assert response.status_code == 401 error_message = json.loads(response.get_data()) assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} -def test_requires_admin_auth_returns_error_when_secret_is_invalid(client): - client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] - token = create_jwt_token(secret, client_id) - new_secrets = {client_id: ['something-wrong']} +def test_requires_admin_auth_returns_error_when_secret_is_invalid( + client, + admin_jwt_client_id, + admin_jwt_token, +): + new_secrets = {admin_jwt_client_id: ['something-wrong']} with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): response = client.get( '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) + headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) assert response.status_code == 401 error_message = json.loads(response.get_data()) assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} -def test_requires_auth_returns_error_when_service_doesnt_exit( +def test_requires_auth_returns_error_when_service_doesnt_exist( client, sample_api_key ): @@ -324,11 +382,13 @@ def test_requires_auth_returns_error_when_service_doesnt_exit( assert error_message['message'] == {'token': ['Invalid token: service not found']} -def test_requires_auth_returns_error_when_service_inactive(client, sample_api_key): +def test_requires_auth_returns_error_when_service_inactive( + client, + sample_api_key, + service_jwt_token, +): sample_api_key.service.active = False - token = create_jwt_token(secret=str(sample_api_key.id), client_id=str(sample_api_key.service_id)) - - response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) + response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) assert response.status_code == 403 error_message = json.loads(response.get_data()) @@ -349,22 +409,31 @@ def test_requires_auth_returns_error_when_service_has_no_secrets( assert exc.value.service_id == str(sample_service.id) -def test_should_attach_the_current_api_key_to_current_app(notify_api, sample_service, sample_api_key): +def test_should_attach_the_current_api_key_to_current_app( + notify_api, + sample_service, + sample_api_key, + service_jwt_token, +): with notify_api.test_request_context(), notify_api.test_client() as client: - token = __create_token(sample_api_key.service_id) response = client.get( '/notifications', - headers={'Authorization': 'Bearer {}'.format(token)} + headers={'Authorization': 'Bearer {}'.format(service_jwt_token)} ) assert response.status_code == 200 assert str(api_user.id) == str(sample_api_key.id) def test_requires_auth_return_403_when_token_is_expired( - client, sample_api_key + client, + sample_api_key, + service_jwt_secret, ): with freeze_time('2001-01-01T12:00:00'): - token = __create_token(sample_api_key.service_id) + token = create_jwt_token( + client_id=str(sample_api_key.service_id), + secret=service_jwt_secret, + ) with freeze_time('2001-01-01T12:00:40'): with pytest.raises(AuthError) as exc: request.headers = {'Authorization': 'Bearer {}'.format(token)} @@ -374,11 +443,6 @@ def test_requires_auth_return_403_when_token_is_expired( assert str(exc.value.api_key_id) == str(sample_api_key.id) -def __create_token(service_id): - return create_jwt_token(secret=get_unsigned_secrets(service_id)[0], - client_id=str(service_id)) - - @pytest.mark.parametrize('check_proxy_header,header_value', [ (True, 'key_1'), (True, 'wrong_key'), @@ -408,11 +472,13 @@ def test_requires_no_auth_proxy_key(notify_api, check_proxy_header, header_value (False, 'key_1', 200), (False, 'wrong_key', 200), ]) -def test_requires_admin_auth_proxy_key(notify_api, check_proxy_header, header_value, expected_status): - client_id = current_app.config.get('ADMIN_CLIENT_USER_NAME') - secret = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id][0] - token = create_jwt_token(secret, client_id) - +def test_requires_admin_auth_proxy_key( + notify_api, + check_proxy_header, + header_value, + expected_status, + admin_jwt_token, +): with set_config_values(notify_api, { 'ROUTE_SECRET_KEY_1': 'key_1', 'ROUTE_SECRET_KEY_2': '', @@ -424,13 +490,18 @@ def test_requires_admin_auth_proxy_key(notify_api, check_proxy_header, header_va path='/service', headers=[ ('X-Custom-Forwarder', header_value), - ('Authorization', 'Bearer {}'.format(token)) + ('Authorization', 'Bearer {}'.format(admin_jwt_token)) ] ) assert response.status_code == expected_status -def test_requires_auth_should_cache_service_and_api_key_lookups(mocker, client, sample_api_key): +def test_requires_auth_should_cache_service_and_api_key_lookups( + mocker, + client, + sample_api_key, + service_jwt_token +): mock_get_api_keys = mocker.patch( 'app.serialised_models.get_model_api_keys', wraps=get_model_api_keys, @@ -441,9 +512,8 @@ def test_requires_auth_should_cache_service_and_api_key_lookups(mocker, client, ) for _ in range(5): - token = __create_token(sample_api_key.service_id) client.get('/notifications', headers={ - 'Authorization': f'Bearer {token}' + 'Authorization': f'Bearer {service_jwt_token}' }) assert mock_get_api_keys.call_args_list == [ From 1d806d65eb89238347352e7cc3dc8cf770eb7eae Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 27 Jul 2021 18:04:12 +0100 Subject: [PATCH 05/16] Standardise auth checks for both kinds of API auth Previously "requires_auth" and "requires_admin_auth" had similar but different ways of checking their keys. This switches them to use the same checks, with the admin / internal auth passing in a fake / stub set of "api keys" to check. Pulling out the logic this way will make it easier to unpick the tests, so we can focus on testing what's unique to each kind of API auth and avoid future duplication when we start calling the "requires_internal_auth" method with other client_ids. Note that a couple of error messages / response codes have changed for admin / internal auth. None of these occur in practice, so we can make them consistent with the behaviour for the public API. --- app/authentication/auth.py | 58 ++++++++++--------- .../app/authentication/test_authentication.py | 12 ++-- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index 4ca1e5c88..1b882edd1 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -49,6 +49,13 @@ class AuthError(Exception): } +class InternalApiKey(): + def __init__(self, client_id, secret): + self.secret = secret + self.id = client_id + self.expiry_date = None + + def get_auth_token(req): auth_header = req.headers.get('Authorization', None) if not auth_header: @@ -81,23 +88,13 @@ def requires_internal_auth(expected_client_id): if client_id != expected_client_id: raise AuthError("Unauthorized: not allowed to perform this action", 401) + api_keys = [ + InternalApiKey(client_id, secret) + for secret in current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id] + ] + + _decode_jwt_token(auth_token, api_keys, client_id) g.service_id = client_id - secrets = current_app.config.get('INTERNAL_CLIENT_API_KEYS')[client_id] - - for secret in secrets: - try: - decode_jwt_token(auth_token, secret) - return - except TokenExpiredError: - raise AuthError("Invalid token: expired, check that your system clock is accurate", 403) - except TokenDecodeError: - # TODO: Change this so it doesn't also catch `TokenIssuerError` or `TokenIssuedAtError` exceptions - # (which are children of `TokenDecodeError`) as these should cause an auth error immediately rather - # than continue on to check the next admin client secret - continue - - # Either there are no admin client secrets or their token didn't match one of them so error - raise AuthError("Unauthorized: API authentication token not found", 401) def requires_auth(): @@ -123,15 +120,23 @@ def requires_auth(): if not service.active: raise AuthError("Invalid token: service is archived", 403, service_id=service.id) - for api_key in service.api_keys: + api_key = _decode_jwt_token(auth_token, service.api_keys, service.id) + + g.api_user = api_key + g.service_id = service_id + g.authenticated_service = service + + +def _decode_jwt_token(auth_token, api_keys, service_id=None): + for api_key in api_keys: try: decode_jwt_token(auth_token, api_key.secret) except TokenExpiredError: err_msg = "Error: Your system clock must be accurate to within 30 seconds" - raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id) + raise AuthError(err_msg, 403, service_id=service_id, api_key_id=api_key.id) except TokenAlgorithmError: err_msg = "Invalid token: algorithm used is not HS256" - raise AuthError(err_msg, 403, service_id=service.id, api_key_id=api_key.id) + raise AuthError(err_msg, 403, service_id=service_id, api_key_id=api_key.id) except TokenDecodeError: # we attempted to validate the token but it failed meaning it was not signed using this api key. # Let's try the next one @@ -141,25 +146,22 @@ def requires_auth(): continue except TokenError: # General error when trying to decode and validate the token - raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403, service_id=service.id, api_key_id=api_key.id) + raise AuthError(GENERAL_TOKEN_ERROR_MESSAGE, 403, service_id=service_id, api_key_id=api_key.id) if api_key.expiry_date: - raise AuthError("Invalid token: API key revoked", 403, service_id=service.id, api_key_id=api_key.id) - - g.service_id = service.id - g.api_user = api_key - g.authenticated_service = service + raise AuthError("Invalid token: API key revoked", 403, service_id=service_id, api_key_id=api_key.id) current_app.logger.info('API authorised for service {} with api key {}, using issuer {} for URL: {}'.format( - service.id, + service_id, api_key.id, request.headers.get('User-Agent'), request.base_url )) - return + + return api_key else: # service has API keys, but none matching the one the user provided - raise AuthError("Invalid token: API key not found", 403, service_id=service.id) + raise AuthError("Invalid token: API key not found", 403, service_id=service_id) def __get_token_issuer(auth_token): diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 43b5c4f02..b981ad24e 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -134,7 +134,7 @@ def test_requires_admin_auth_should_not_allow_request_with_no_iat( request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: requires_admin_auth() - assert exc.value.short_message == "Unauthorized: API authentication token not found" + assert exc.value.short_message == "Invalid token: API key not found" def test_requires_admin_auth_should_not_allow_request_with_old_iat( @@ -150,7 +150,7 @@ def test_requires_admin_auth_should_not_allow_request_with_old_iat( request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: requires_admin_auth() - assert exc.value.short_message == "Invalid token: expired, check that your system clock is accurate" + assert exc.value.short_message == "Error: Your system clock must be accurate to within 30 seconds" def test_requires_auth_should_not_allow_request_with_extra_claims( @@ -342,9 +342,9 @@ def test_requires_admin_auth_returns_error_with_no_secrets( '/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) - assert response.status_code == 401 + assert response.status_code == 403 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} + assert error_message['message'] == {"token": ["Invalid token: API key not found"]} def test_requires_admin_auth_returns_error_when_secret_is_invalid( @@ -359,9 +359,9 @@ def test_requires_admin_auth_returns_error_when_secret_is_invalid( '/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) - assert response.status_code == 401 + assert response.status_code == 403 error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Unauthorized: API authentication token not found"]} + assert error_message['message'] == {"token": ["Invalid token: API key not found"]} def test_requires_auth_returns_error_when_service_doesnt_exist( From 2c568698d1278c3d3c2f7204bbf8202fc0b02080 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 28 Jul 2021 13:55:55 +0100 Subject: [PATCH 06/16] Simplify tests for get auth token / issuer This switches to testing the two functions directly as trying to test them through the top-level "requires_..." functions or calls to endpoints doesn't scale as we add more of them. While this has a slight risk that a "requires_..." function might not be using these helpers, it seems unlikely and we can always add a mock to check this if we're concerned in future. --- app/authentication/auth.py | 36 +++++++-------- .../app/authentication/test_authentication.py | 46 +++++++------------ 2 files changed, 35 insertions(+), 47 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index 1b882edd1..d475ce83d 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -56,19 +56,6 @@ class InternalApiKey(): self.expiry_date = None -def get_auth_token(req): - auth_header = req.headers.get('Authorization', None) - if not auth_header: - raise AuthError('Unauthorized: authentication token must be provided', 401) - - auth_scheme = auth_header[:7].title() - - if auth_scheme != 'Bearer ': - raise AuthError('Unauthorized: authentication bearer scheme must be used', 401) - - return auth_header[7:] - - def requires_no_auth(): pass @@ -82,8 +69,8 @@ def requires_internal_auth(expected_client_id): raise TypeError("Unknown client_id for internal auth") request_helper.check_proxy_header_before_request() - auth_token = get_auth_token(request) - client_id = __get_token_issuer(auth_token) + auth_token = _get_auth_token(request) + client_id = _get_token_issuer(auth_token) if client_id != expected_client_id: raise AuthError("Unauthorized: not allowed to perform this action", 401) @@ -100,8 +87,8 @@ def requires_internal_auth(expected_client_id): def requires_auth(): request_helper.check_proxy_header_before_request() - auth_token = get_auth_token(request) - issuer = __get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID + auth_token = _get_auth_token(request) + issuer = _get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID try: service_id = uuid.UUID(issuer) @@ -164,7 +151,20 @@ def _decode_jwt_token(auth_token, api_keys, service_id=None): raise AuthError("Invalid token: API key not found", 403, service_id=service_id) -def __get_token_issuer(auth_token): +def _get_auth_token(req): + auth_header = req.headers.get('Authorization', None) + if not auth_header: + raise AuthError('Unauthorized: authentication token must be provided', 401) + + auth_scheme = auth_header[:7].title() + + if auth_scheme != 'Bearer ': + raise AuthError('Unauthorized: authentication bearer scheme must be used', 401) + + return auth_header[7:] + + +def _get_token_issuer(auth_token): try: issuer = get_token_issuer(auth_token) except TokenIssuerError: diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index b981ad24e..ef663dc79 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -13,6 +13,8 @@ from app import api_user from app.authentication.auth import ( GENERAL_TOKEN_ERROR_MESSAGE, AuthError, + _get_auth_token, + _get_token_issuer, requires_admin_auth, requires_auth, ) @@ -62,39 +64,40 @@ def admin_jwt_token(admin_jwt_client_id, admin_jwt_secret): return create_jwt_token(admin_jwt_secret, admin_jwt_client_id) -@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) -def test_should_not_allow_request_with_no_token(client, auth_fn): +def test_get_auth_token_should_not_allow_request_with_no_token(client): request.headers = {} with pytest.raises(AuthError) as exc: - auth_fn() + _get_auth_token(request) assert exc.value.short_message == 'Unauthorized: authentication token must be provided' -@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) -def test_should_not_allow_request_with_incorrect_header(client, auth_fn): +def test_get_auth_token_should_not_allow_request_with_incorrect_header(client): request.headers = {'Authorization': 'Basic 1234'} with pytest.raises(AuthError) as exc: - auth_fn() + _get_auth_token(request) assert exc.value.short_message == 'Unauthorized: authentication bearer scheme must be used' -@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) -def test_should_not_allow_request_with_incorrect_token(client, auth_fn): - request.headers = {'Authorization': 'Bearer 1234'} +@pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) +def test_get_auth_token_should_allow_valid_token(client, scheme): + token = create_jwt_token(client_id='something', secret='secret') + request.headers={'Authorization': '{} {}'.format(scheme, token)} + assert _get_auth_token(request) == token + + +def test_get_token_issuer_should_not_allow_request_with_incorrect_token(client): with pytest.raises(AuthError) as exc: - auth_fn() + _get_token_issuer("Bearer 1234") assert exc.value.short_message == GENERAL_TOKEN_ERROR_MESSAGE -@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth]) -def test_should_not_allow_request_with_no_iss(client, auth_fn): +def test_get_token_issuer_should_not_allow_request_with_no_iss(client): token = create_custom_jwt_token( payload={'iat': int(time.time())} ) - request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: - auth_fn() + _get_token_issuer(token) assert exc.value.short_message == 'Invalid token: iss field not provided' @@ -186,21 +189,6 @@ def test_requires_auth_should_not_allow_invalid_secret(client, sample_api_key): assert data['message'] == {"token": ['Invalid token: API key not found']} -@pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) -def test_requires_auth_should_allow_valid_token( - client, - sample_api_key, - service_jwt_secret, - scheme, -): - token = create_jwt_token( - client_id=str(sample_api_key.service_id), - secret=service_jwt_secret, - ) - response = client.get('/notifications', headers={'Authorization': '{} {}'.format(scheme, token)}) - assert response.status_code == 200 - - @pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234]) def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type( client, From 9da937ab1d793ab4a861d71f4bb456e7d1b141ac Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Wed, 28 Jul 2021 17:32:23 +0100 Subject: [PATCH 07/16] DRY-up tests for decoding JWT tokens Previously we had a lot of duplicate tests inconsistently checking each of the "requires_" functions. Since both of them now use the same "_decode_jwt_token" helper, we can consolidate all the tests onto that. In future commits we'll look at testing the top-level functions in terms of what they do specifically. --- .../app/authentication/test_authentication.py | 312 +++++------------- 1 file changed, 88 insertions(+), 224 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index ef663dc79..571fef4c6 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -1,33 +1,27 @@ import time import uuid -from datetime import datetime from unittest.mock import call import jwt import pytest from flask import current_app, json, request -from freezegun import freeze_time from notifications_python_client.authentication import create_jwt_token from app import api_user from app.authentication.auth import ( GENERAL_TOKEN_ERROR_MESSAGE, AuthError, + _decode_jwt_token, _get_auth_token, _get_token_issuer, - requires_admin_auth, - requires_auth, ) from app.dao.api_key_dao import ( expire_api_key, get_model_api_keys, - get_unsigned_secret, get_unsigned_secrets, - save_model_api_key, ) from app.dao.services_dao import dao_fetch_service_by_id -from app.models import KEY_TYPE_NORMAL, ApiKey -from tests.conftest import set_config, set_config_values +from tests.conftest import set_config_values def create_custom_jwt_token(headers=None, payload=None, key=None): @@ -50,17 +44,9 @@ def service_jwt_token(sample_api_key, service_jwt_secret): @pytest.fixture -def admin_jwt_client_id(): - return current_app.config['ADMIN_CLIENT_USER_NAME'] - - -@pytest.fixture -def admin_jwt_secret(admin_jwt_client_id): - return current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] - - -@pytest.fixture -def admin_jwt_token(admin_jwt_client_id, admin_jwt_secret): +def admin_jwt_token(): + admin_jwt_client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + admin_jwt_secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] return create_jwt_token(admin_jwt_secret, admin_jwt_client_id) @@ -81,7 +67,7 @@ def test_get_auth_token_should_not_allow_request_with_incorrect_header(client): @pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) def test_get_auth_token_should_allow_valid_token(client, scheme): token = create_jwt_token(client_id='something', secret='secret') - request.headers={'Authorization': '{} {}'.format(scheme, token)} + request.headers = {'Authorization': '{} {}'.format(scheme, token)} assert _get_auth_token(request) == token @@ -101,92 +87,131 @@ def test_get_token_issuer_should_not_allow_request_with_no_iss(client): assert exc.value.short_message == 'Invalid token: iss field not provided' -def test_requires_auth_should_not_allow_request_with_no_iat(client, sample_api_key): - token = create_custom_jwt_token( - payload={'iss': str(sample_api_key.service_id)} - ) - - request.headers = {'Authorization': 'Bearer {}'.format(token)} - with pytest.raises(AuthError) as exc: - requires_auth() - assert exc.value.short_message == 'Invalid token: API key not found' - - -def test_requires_auth_should_not_allow_request_with_non_hs256_algorithm(client, sample_api_key): +def test_decode_jwt_token_should_not_allow_non_hs256_algorithm(client, sample_api_key): token = create_custom_jwt_token( headers={"typ": 'JWT', "alg": 'HS512'}, - payload={'iss': str(sample_api_key.service_id), 'iat': int(time.time())} + payload={}, ) - request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: - requires_auth() + _decode_jwt_token(token, [sample_api_key]) assert exc.value.short_message == 'Invalid token: algorithm used is not HS256' -def test_requires_admin_auth_should_not_allow_request_with_no_iat( +def test_decode_jwt_token_should_not_allow_no_iat( client, - admin_jwt_client_id, - admin_jwt_secret, + sample_api_key, ): token = create_custom_jwt_token( - payload={'iss': admin_jwt_client_id}, - key=admin_jwt_secret + payload={'iss': 'something'} ) - request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: - requires_admin_auth() + _decode_jwt_token(token, [sample_api_key]) assert exc.value.short_message == "Invalid token: API key not found" -def test_requires_admin_auth_should_not_allow_request_with_old_iat( +def test_decode_jwt_token_should_not_allow_old_iat( client, - admin_jwt_client_id, - admin_jwt_secret, + sample_api_key, ): token = create_custom_jwt_token( - payload={'iss': admin_jwt_client_id, 'iat': int(time.time()) - 60}, - key=admin_jwt_secret + payload={'iss': 'something', 'iat': int(time.time()) - 60}, + key=sample_api_key.secret, ) - request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: - requires_admin_auth() + _decode_jwt_token(token, [sample_api_key]) assert exc.value.short_message == "Error: Your system clock must be accurate to within 30 seconds" -def test_requires_auth_should_not_allow_request_with_extra_claims( +def test_decode_jwt_token_should_not_allow_extra_claims( client, sample_api_key, - service_jwt_secret, ): token = create_custom_jwt_token( payload={ - 'iss': str(sample_api_key.service_id), + 'iss': 'something', 'iat': int(time.time()), 'aud': 'notifications.service.gov.uk' # extra claim that we don't support }, - key=service_jwt_secret, + key=sample_api_key.secret, ) - request.headers = {'Authorization': 'Bearer {}'.format(token)} with pytest.raises(AuthError) as exc: - requires_auth() + _decode_jwt_token(token, [sample_api_key]) assert exc.value.short_message == GENERAL_TOKEN_ERROR_MESSAGE -def test_requires_auth_should_not_allow_invalid_secret(client, sample_api_key): +def test_decode_jwt_token_should_not_allow_invalid_secret( + client, + sample_api_key +): token = create_jwt_token( secret="not-so-secret", - client_id=str(sample_api_key.service_id)) - response = client.get( - '/notifications', - headers={'Authorization': "Bearer {}".format(token)} + client_id=str(sample_api_key.service_id) ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: API key not found']} + + with pytest.raises(AuthError) as exc: + _decode_jwt_token(token, [sample_api_key]) + assert exc.value.short_message == 'Invalid token: API key not found' + + +def test_decode_jwt_token_should_allow_multiple_api_keys( + client, + sample_api_key, + sample_test_api_key, +): + token = create_jwt_token( + secret=sample_test_api_key.secret, + client_id=str(sample_test_api_key.service_id), + ) + + # successful if no error is raised + _decode_jwt_token(token, [sample_api_key, sample_test_api_key]) + + +def test_decode_jwt_token_should_allow_some_expired_keys( + client, + sample_api_key, + sample_test_api_key, +): + expire_api_key(sample_api_key.service_id, sample_api_key.id) + + token = create_jwt_token( + secret=sample_test_api_key.secret, + client_id=str(sample_test_api_key.service_id), + ) + + # successful if no error is raised + _decode_jwt_token(token, [sample_api_key, sample_test_api_key]) + + +def test_decode_jwt_token_errors_when_all_api_keys_are_expired( + client, + sample_api_key, + sample_test_api_key, +): + expire_api_key(sample_api_key.service_id, sample_api_key.id) + expire_api_key(sample_test_api_key.service_id, sample_test_api_key.id) + + token = create_jwt_token( + secret=sample_test_api_key.secret, + client_id=str(sample_test_api_key.service_id), + ) + + with pytest.raises(AuthError) as exc: + _decode_jwt_token(token, [sample_api_key, sample_test_api_key], service_id='1234') + + assert exc.value.short_message == 'Invalid token: API key revoked' + assert exc.value.service_id == '1234' + assert exc.value.api_key_id == sample_test_api_key.id + + +def test_decode_jwt_token_returns_error_with_no_secrets(client): + with pytest.raises(AuthError) as exc: + _decode_jwt_token('token', []) + assert exc.value.short_message == "Invalid token: API key not found" @pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234]) @@ -224,134 +249,6 @@ def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_para assert response.status_code == 200 -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params_with_second_secret( - client, - admin_jwt_client_id, -): - new_secrets = {admin_jwt_client_id: ["secret1", "secret2"]} - - with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): - token = create_jwt_token("secret1", admin_jwt_client_id) - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 - - token = create_jwt_token("secret2", admin_jwt_client_id) - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 - - -def test_requires_auth_should_allow_valid_token_when_service_has_multiple_keys( - client, - sample_api_key, - service_jwt_token, -): - data = {'service': sample_api_key.service, - 'name': 'some key name', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**data) - save_model_api_key(api_key) - response = client.get( - '/notifications', - headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) - assert response.status_code == 200 - - -def test_requires_auth_passes_when_service_has_multiple_keys_some_expired( - client, - sample_api_key, -): - expired_key_data = {'service': sample_api_key.service, - 'name': 'expired_key', - 'expiry_date': datetime.utcnow(), - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - expired_key = ApiKey(**expired_key_data) - save_model_api_key(expired_key) - another_key = {'service': sample_api_key.service, - 'name': 'another_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**another_key) - save_model_api_key(api_key) - token = create_jwt_token( - client_id=str(sample_api_key.service_id), - secret=get_unsigned_secret(api_key.id) - ) - response = client.get( - '/notifications', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 - - -def test_requires_auth_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys( - client, - sample_api_key -): - expired_key = {'service': sample_api_key.service, - 'name': 'expired_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - expired_api_key = ApiKey(**expired_key) - save_model_api_key(expired_api_key) - another_key = {'service': sample_api_key.service, - 'name': 'another_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**another_key) - save_model_api_key(api_key) - token = create_jwt_token( - client_id=str(sample_api_key.service_id), - secret=get_unsigned_secret(expired_api_key.id) - ) - expire_api_key(service_id=sample_api_key.service_id, api_key_id=expired_api_key.id) - request.headers = {'Authorization': 'Bearer {}'.format(token)} - with pytest.raises(AuthError) as exc: - requires_auth() - assert exc.value.short_message == 'Invalid token: API key revoked' - assert exc.value.service_id == str(expired_api_key.service_id) - assert exc.value.api_key_id == expired_api_key.id - - -def test_requires_admin_auth_returns_error_with_no_secrets( - client, - admin_jwt_client_id, - admin_jwt_token, -): - new_secrets = {admin_jwt_client_id: []} - - with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) - - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: API key not found"]} - - -def test_requires_admin_auth_returns_error_when_secret_is_invalid( - client, - admin_jwt_client_id, - admin_jwt_token, -): - new_secrets = {admin_jwt_client_id: ['something-wrong']} - - with set_config(client.application, 'INTERNAL_CLIENT_API_KEYS', new_secrets): - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) - - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ["Invalid token: API key not found"]} - - def test_requires_auth_returns_error_when_service_doesnt_exist( client, sample_api_key @@ -383,20 +280,6 @@ def test_requires_auth_returns_error_when_service_inactive( assert error_message['message'] == {'token': ['Invalid token: service is archived']} -def test_requires_auth_returns_error_when_service_has_no_secrets( - client, sample_service, fake_uuid -): - token = create_jwt_token( - secret=fake_uuid, - client_id=str(sample_service.id)) - - request.headers = {'Authorization': 'Bearer {}'.format(token)} - with pytest.raises(AuthError) as exc: - requires_auth() - assert exc.value.short_message == 'Invalid token: service has no API keys' - assert exc.value.service_id == str(sample_service.id) - - def test_should_attach_the_current_api_key_to_current_app( notify_api, sample_service, @@ -412,25 +295,6 @@ def test_should_attach_the_current_api_key_to_current_app( assert str(api_user.id) == str(sample_api_key.id) -def test_requires_auth_return_403_when_token_is_expired( - client, - sample_api_key, - service_jwt_secret, -): - with freeze_time('2001-01-01T12:00:00'): - token = create_jwt_token( - client_id=str(sample_api_key.service_id), - secret=service_jwt_secret, - ) - with freeze_time('2001-01-01T12:00:40'): - with pytest.raises(AuthError) as exc: - request.headers = {'Authorization': 'Bearer {}'.format(token)} - requires_auth() - assert exc.value.short_message == 'Error: Your system clock must be accurate to within 30 seconds' - assert exc.value.service_id == str(sample_api_key.service_id) - assert str(exc.value.api_key_id) == str(sample_api_key.id) - - @pytest.mark.parametrize('check_proxy_header,header_value', [ (True, 'key_1'), (True, 'wrong_key'), From 65d0dc43f008dd42505747d6fc38d7937924d9a1 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 11:00:10 +0100 Subject: [PATCH 08/16] Move happy path request-based tests to top of file We can simplify the rest of the tests to avoid the boilerplate of making an actual request. But it's worth keeping these two to prove the wrapper work correctly for an arbitrary route. --- .../app/authentication/test_authentication.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 571fef4c6..acd86bec0 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -50,6 +50,22 @@ def admin_jwt_token(): return create_jwt_token(admin_jwt_secret, admin_jwt_client_id) +def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url( + client, + service_jwt_token, +): + response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) + assert response.status_code == 200 + + +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params( + client, + admin_jwt_token +): + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) + assert response.status_code == 200 + + def test_get_auth_token_should_not_allow_request_with_no_token(client): request.headers = {} with pytest.raises(AuthError) as exc: @@ -233,22 +249,6 @@ def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type( assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} -def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url( - client, - service_jwt_token, -): - response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) - assert response.status_code == 200 - - -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params( - client, - admin_jwt_token -): - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) - assert response.status_code == 200 - - def test_requires_auth_returns_error_when_service_doesnt_exist( client, sample_api_key From 4fc820c0cc0fb2da835b922196ac148aa459cf75 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 11:12:49 +0100 Subject: [PATCH 09/16] Remove redundant test about no auth proxy checks This is a lot of code to check we haven't written a single line, which we can just visually see isn't there. We should avoid having tests that check code _isn't_ there, as such testing is infinite. --- .../app/authentication/test_authentication.py | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index acd86bec0..6d3a3d000 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -295,29 +295,6 @@ def test_should_attach_the_current_api_key_to_current_app( assert str(api_user.id) == str(sample_api_key.id) -@pytest.mark.parametrize('check_proxy_header,header_value', [ - (True, 'key_1'), - (True, 'wrong_key'), - (False, 'key_1'), - (False, 'wrong_key'), -]) -def test_requires_no_auth_proxy_key(notify_api, check_proxy_header, header_value): - with set_config_values(notify_api, { - 'ROUTE_SECRET_KEY_1': 'key_1', - 'ROUTE_SECRET_KEY_2': '', - 'CHECK_PROXY_HEADER': check_proxy_header, - }): - - with notify_api.test_client() as client: - response = client.get( - path='/_status', - headers=[ - ('X-Custom-Forwarder', header_value), - ] - ) - assert response.status_code == 200 - - @pytest.mark.parametrize('check_proxy_header,header_value,expected_status', [ (True, 'key_1', 200), (True, 'wrong_key', 403), From 193e7e0004bc93c428e5ec588e7301a21163afe2 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 11:18:43 +0100 Subject: [PATCH 10/16] Rewrite other requires_auth tests to call function This makes the tests easier to read by avoiding request boilerplate and making a clearer link between the name of the test and that we are actually testing that specific function. --- .../app/authentication/test_authentication.py | 69 ++++++++----------- 1 file changed, 27 insertions(+), 42 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 6d3a3d000..d62070eec 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -1,10 +1,9 @@ import time import uuid -from unittest.mock import call import jwt import pytest -from flask import current_app, json, request +from flask import current_app, request from notifications_python_client.authentication import create_jwt_token from app import api_user @@ -14,6 +13,7 @@ from app.authentication.auth import ( _decode_jwt_token, _get_auth_token, _get_token_issuer, + requires_auth, ) from app.dao.api_key_dao import ( expire_api_key, @@ -240,13 +240,11 @@ def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type( client_id=service_id, secret=service_jwt_secret, ) - response = client.get( - '/notifications', - headers={'Authorization': "Bearer {}".format(token)} - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} + + request.headers = {'Authorization': "Bearer {}".format(token)} + with pytest.raises(AuthError) as exc: + requires_auth() + assert exc.value.short_message == 'Invalid token: service id is not the right data type' def test_requires_auth_returns_error_when_service_doesnt_exist( @@ -256,15 +254,13 @@ def test_requires_auth_returns_error_when_service_doesnt_exist( # get service ID and secret the wrong way around token = create_jwt_token( secret=str(sample_api_key.service_id), - client_id=str(sample_api_key.id)) - - response = client.get( - '/notifications', - headers={'Authorization': 'Bearer {}'.format(token)} + client_id=str(sample_api_key.id), ) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {'token': ['Invalid token: service not found']} + + request.headers = {'Authorization': 'Bearer {}'.format(token)} + with pytest.raises(AuthError) as exc: + requires_auth() + assert exc.value.short_message == 'Invalid token: service not found' def test_requires_auth_returns_error_when_service_inactive( @@ -273,26 +269,21 @@ def test_requires_auth_returns_error_when_service_inactive( service_jwt_token, ): sample_api_key.service.active = False - response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(service_jwt_token)}) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {'token': ['Invalid token: service is archived']} + request.headers = {'Authorization': 'Bearer {}'.format(service_jwt_token)} + with pytest.raises(AuthError) as exc: + requires_auth() + assert exc.value.short_message == 'Invalid token: service is archived' -def test_should_attach_the_current_api_key_to_current_app( - notify_api, - sample_service, +def test_requires_auth_should_attach_the_current_api_key_to_current_app( + client, sample_api_key, service_jwt_token, ): - with notify_api.test_request_context(), notify_api.test_client() as client: - response = client.get( - '/notifications', - headers={'Authorization': 'Bearer {}'.format(service_jwt_token)} - ) - assert response.status_code == 200 - assert str(api_user.id) == str(sample_api_key.id) + request.headers = {'Authorization': 'Bearer {}'.format(service_jwt_token)} + requires_auth() + assert str(api_user.id) == str(sample_api_key.id) @pytest.mark.parametrize('check_proxy_header,header_value,expected_status', [ @@ -328,7 +319,6 @@ def test_requires_admin_auth_proxy_key( def test_requires_auth_should_cache_service_and_api_key_lookups( mocker, client, - sample_api_key, service_jwt_token ): mock_get_api_keys = mocker.patch( @@ -340,14 +330,9 @@ def test_requires_auth_should_cache_service_and_api_key_lookups( wraps=dao_fetch_service_by_id, ) - for _ in range(5): - client.get('/notifications', headers={ - 'Authorization': f'Bearer {service_jwt_token}' - }) + request.headers = {'Authorization': f'Bearer {service_jwt_token}'} + requires_auth() + requires_auth() # second request - assert mock_get_api_keys.call_args_list == [ - call(str(sample_api_key.service_id)) - ] - assert mock_get_service.call_args_list == [ - call(sample_api_key.service_id) - ] + mock_get_api_keys.assert_called_once() + mock_get_service.assert_called_once() From f5d2edaa0a9bb101559272e0f1f8b8f60fc59d19 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 11:52:25 +0100 Subject: [PATCH 11/16] Rewrite requires_admin_auth tests to be generic This adds previously missing tests and changes the existing ones to test the "requires_internal_auth" function directly. In order to make the tests generic, we have a fake auth function and an associated fixture. Having generic tests for internal auth will make it easier to add other "requires..." functions in future. --- .../app/authentication/test_authentication.py | 98 ++++++++++++------- 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index d62070eec..50e2711e2 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -3,7 +3,7 @@ import uuid import jwt import pytest -from flask import current_app, request +from flask import current_app, g, request from notifications_python_client.authentication import create_jwt_token from app import api_user @@ -14,6 +14,7 @@ from app.authentication.auth import ( _get_auth_token, _get_token_issuer, requires_auth, + requires_internal_auth, ) from app.dao.api_key_dao import ( expire_api_key, @@ -24,6 +25,23 @@ from app.dao.services_dao import dao_fetch_service_by_id from tests.conftest import set_config_values +@pytest.fixture +def internal_jwt_token(notify_api): + with set_config_values(notify_api, { + 'INTERNAL_CLIENT_API_KEYS': { + 'my-internal-app': ['my-internal-app-secret'], + } + }): + yield create_jwt_token( + client_id='my-internal-app', + secret='my-internal-app-secret' + ) + + +def requires_my_internal_app_auth(): + requires_internal_auth('my-internal-app') + + def create_custom_jwt_token(headers=None, payload=None, key=None): # code copied from notifications_python_client.authentication.py::create_jwt_token headers = headers or {"typ": 'JWT', "alg": 'HS256'} @@ -43,13 +61,6 @@ def service_jwt_token(sample_api_key, service_jwt_secret): ) -@pytest.fixture -def admin_jwt_token(): - admin_jwt_client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] - admin_jwt_secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] - return create_jwt_token(admin_jwt_secret, admin_jwt_client_id) - - def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url( client, service_jwt_token, @@ -58,10 +69,11 @@ def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for assert response.status_code == 200 -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params( - client, - admin_jwt_token -): +def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params(client): + admin_jwt_client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + admin_jwt_secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] + admin_jwt_token = create_jwt_token(admin_jwt_secret, admin_jwt_client_id) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(admin_jwt_token)}) assert response.status_code == 200 @@ -286,34 +298,44 @@ def test_requires_auth_should_attach_the_current_api_key_to_current_app( assert str(api_user.id) == str(sample_api_key.id) -@pytest.mark.parametrize('check_proxy_header,header_value,expected_status', [ - (True, 'key_1', 200), - (True, 'wrong_key', 403), - (False, 'key_1', 200), - (False, 'wrong_key', 200), -]) -def test_requires_admin_auth_proxy_key( - notify_api, - check_proxy_header, - header_value, - expected_status, - admin_jwt_token, +def test_requires_internal_auth_checks_proxy_key( + client, + mocker, + internal_jwt_token, ): - with set_config_values(notify_api, { - 'ROUTE_SECRET_KEY_1': 'key_1', - 'ROUTE_SECRET_KEY_2': '', - 'CHECK_PROXY_HEADER': check_proxy_header, - }): + proxy_check_mock = mocker.patch( + 'app.authentication.auth.request_helper.check_proxy_header_before_request' + ) - with notify_api.test_client() as client: - response = client.get( - path='/service', - headers=[ - ('X-Custom-Forwarder', header_value), - ('Authorization', 'Bearer {}'.format(admin_jwt_token)) - ] - ) - assert response.status_code == expected_status + request.headers = {'Authorization': 'Bearer {}'.format(internal_jwt_token)} + requires_my_internal_app_auth() + proxy_check_mock.assert_called_once() + + +def test_requires_internal_auth_errors_for_unknown_app(client): + with pytest.raises(TypeError) as exc: + requires_internal_auth('another-app') + assert str(exc.value) == 'Unknown client_id for internal auth' + + +def test_requires_internal_auth_errors_for_api_app_mismatch( + client, + internal_jwt_token, + service_jwt_token +): + request.headers = {'Authorization': 'Bearer {}'.format(service_jwt_token)} + with pytest.raises(AuthError) as exc: + requires_my_internal_app_auth() + assert exc.value.short_message == 'Unauthorized: not allowed to perform this action' + + +def test_requires_internal_auth_sets_global_variables( + client, + internal_jwt_token, +): + request.headers = {'Authorization': 'Bearer {}'.format(internal_jwt_token)} + requires_my_internal_app_auth() + assert g.service_id == 'my-internal-app' def test_requires_auth_should_cache_service_and_api_key_lookups( From cd0a9505714bd7ccd3eac26c5d25b39a7973ed12 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 11:55:04 +0100 Subject: [PATCH 12/16] Colocate requires_auth test with the others This is to help minimise the diff for the next commit. --- .../app/authentication/test_authentication.py | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 50e2711e2..33c26058d 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -298,6 +298,28 @@ def test_requires_auth_should_attach_the_current_api_key_to_current_app( assert str(api_user.id) == str(sample_api_key.id) +def test_requires_auth_should_cache_service_and_api_key_lookups( + mocker, + client, + service_jwt_token +): + mock_get_api_keys = mocker.patch( + 'app.serialised_models.get_model_api_keys', + wraps=get_model_api_keys, + ) + mock_get_service = mocker.patch( + 'app.serialised_models.dao_fetch_service_by_id', + wraps=dao_fetch_service_by_id, + ) + + request.headers = {'Authorization': f'Bearer {service_jwt_token}'} + requires_auth() + requires_auth() # second request + + mock_get_api_keys.assert_called_once() + mock_get_service.assert_called_once() + + def test_requires_internal_auth_checks_proxy_key( client, mocker, @@ -336,25 +358,3 @@ def test_requires_internal_auth_sets_global_variables( request.headers = {'Authorization': 'Bearer {}'.format(internal_jwt_token)} requires_my_internal_app_auth() assert g.service_id == 'my-internal-app' - - -def test_requires_auth_should_cache_service_and_api_key_lookups( - mocker, - client, - service_jwt_token -): - mock_get_api_keys = mocker.patch( - 'app.serialised_models.get_model_api_keys', - wraps=get_model_api_keys, - ) - mock_get_service = mocker.patch( - 'app.serialised_models.dao_fetch_service_by_id', - wraps=dao_fetch_service_by_id, - ) - - request.headers = {'Authorization': f'Bearer {service_jwt_token}'} - requires_auth() - requires_auth() # second request - - mock_get_api_keys.assert_called_once() - mock_get_service.assert_called_once() From 323feedb1f9bd056f9b8e938329ce94fe54aeefd Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 12:09:01 +0100 Subject: [PATCH 13/16] Backfill missing tests for requires_auth function --- .../app/authentication/test_authentication.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 33c26058d..1987c1c92 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -6,7 +6,7 @@ import pytest from flask import current_app, g, request from notifications_python_client.authentication import create_jwt_token -from app import api_user +from app import db from app.authentication.auth import ( GENERAL_TOKEN_ERROR_MESSAGE, AuthError, @@ -288,14 +288,28 @@ def test_requires_auth_returns_error_when_service_inactive( assert exc.value.short_message == 'Invalid token: service is archived' -def test_requires_auth_should_attach_the_current_api_key_to_current_app( +def test_requires_auth_should_assign_global_variables( client, sample_api_key, service_jwt_token, ): request.headers = {'Authorization': 'Bearer {}'.format(service_jwt_token)} requires_auth() - assert str(api_user.id) == str(sample_api_key.id) + assert g.api_user.id == sample_api_key.id + assert g.service_id == sample_api_key.service_id + assert g.authenticated_service.id == str(sample_api_key.service_id) + + +def test_requires_auth_errors_if_service_has_no_api_keys( + client, + sample_api_key, + service_jwt_token, +): + db.session.delete(sample_api_key) + request.headers = {'Authorization': 'Bearer {}'.format(service_jwt_token)} + with pytest.raises(AuthError) as exc: + requires_auth() + assert exc.value.short_message == 'Invalid token: service has no API keys' def test_requires_auth_should_cache_service_and_api_key_lookups( From 3e32fc99b820ee3dac9ad4e5564b63ef91adbd1f Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 12:18:10 +0100 Subject: [PATCH 14/16] Rename ADMIN_CLIENT_USER_NAME to say CLIENT_ID "user name" implies we're doing basic auth, which we're not. We should use the standard terminology for bearer tokens. --- app/authentication/auth.py | 2 +- app/config.py | 6 +++--- tests/__init__.py | 2 +- tests/app/authentication/test_authentication.py | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index d475ce83d..737539a0e 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -61,7 +61,7 @@ def requires_no_auth(): def requires_admin_auth(): - requires_internal_auth(current_app.config.get('ADMIN_CLIENT_USER_NAME')) + requires_internal_auth(current_app.config.get('ADMIN_CLIENT_ID')) def requires_internal_auth(expected_client_id): diff --git a/app/config.py b/app/config.py index 74d679001..92502d6e3 100644 --- a/app/config.py +++ b/app/config.py @@ -88,10 +88,10 @@ class Config(object): API_INTERNAL_SECRETS = json.loads(os.environ.get('API_INTERNAL_SECRETS', '[]')) # secrets that internal apps, such as the admin app or document download, must use to authenticate with the API - ADMIN_CLIENT_USER_NAME = 'notify-admin' + ADMIN_CLIENT_ID = 'notify-admin' INTERNAL_CLIENT_API_KEYS = { - ADMIN_CLIENT_USER_NAME: API_INTERNAL_SECRETS + ADMIN_CLIENT_ID: API_INTERNAL_SECRETS } # encyption secret/salt @@ -406,7 +406,7 @@ class Development(Config): LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise' INTERNAL_CLIENT_API_KEYS = { - Config.ADMIN_CLIENT_USER_NAME: ['dev-notify-secret-key'] + Config.ADMIN_CLIENT_ID: ['dev-notify-secret-key'] } SECRET_KEY = 'dev-notify-secret-key' diff --git a/tests/__init__.py b/tests/__init__.py index c276a0170..5406083c9 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -27,7 +27,7 @@ def create_authorization_header(service_id=None, key_type=KEY_TYPE_NORMAL): secret = api_key.secret else: - client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] + client_id = current_app.config['ADMIN_CLIENT_ID'] secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][client_id][0] token = create_jwt_token(secret=secret, client_id=client_id) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 1987c1c92..50dd3fa08 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -61,7 +61,7 @@ def service_jwt_token(sample_api_key, service_jwt_secret): ) -def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for_public_url( +def test_requires_auth_should_allow_valid_token_for_request( client, service_jwt_token, ): @@ -69,8 +69,8 @@ def test_requires_auth_should_allow_valid_token_for_request_with_path_params_for assert response.status_code == 200 -def test_requires_admin_auth_should_allow_valid_token_for_request_with_path_params(client): - admin_jwt_client_id = current_app.config['ADMIN_CLIENT_USER_NAME'] +def test_requires_admin_auth_should_allow_valid_token_for_request(client): + admin_jwt_client_id = current_app.config['ADMIN_CLIENT_ID'] admin_jwt_secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][admin_jwt_client_id][0] admin_jwt_token = create_jwt_token(admin_jwt_secret, admin_jwt_client_id) From 4b7ad89f6ae755b04d6a63081f1a5dda6c65a1f0 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Thu, 29 Jul 2021 12:25:28 +0100 Subject: [PATCH 15/16] Add pretend authenticated API for govuk-alerts We can define the API properly in future work. I've used a separate blueprint from "broadcasts" since this API is purely internal, and it's helpful to make it clear it's specific to govuk-alerts. --- app/__init__.py | 11 ++++++++++- app/authentication/auth.py | 4 ++++ app/config.py | 4 +++- app/v2/govuk_alerts/__init__.py | 11 +++++++++++ app/v2/govuk_alerts/get_broadcasts.py | 8 ++++++++ tests/app/authentication/test_authentication.py | 9 +++++++++ 6 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 app/v2/govuk_alerts/__init__.py create mode 100644 app/v2/govuk_alerts/get_broadcasts.py diff --git a/app/__init__.py b/app/__init__.py index 09e0d195d..3e80457d2 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -271,10 +271,16 @@ def register_blueprint(application): def register_v2_blueprints(application): - from app.authentication.auth import requires_auth + from app.authentication.auth import ( + requires_auth, + requires_govuk_alerts_auth, + ) from app.v2.broadcast.post_broadcast import ( v2_broadcast_blueprint as post_broadcast, ) + from app.v2.govuk_alerts.get_broadcasts import ( + v2_govuk_alerts_blueprint as get_broadcasts, + ) from app.v2.inbound_sms.get_inbound_sms import ( v2_inbound_sms_blueprint as get_inbound_sms, ) @@ -315,6 +321,9 @@ def register_v2_blueprints(application): post_broadcast.before_request(requires_auth) application.register_blueprint(post_broadcast) + get_broadcasts.before_request(requires_govuk_alerts_auth) + application.register_blueprint(get_broadcasts) + def init_app(app): diff --git a/app/authentication/auth.py b/app/authentication/auth.py index 737539a0e..f6179dc3b 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -60,6 +60,10 @@ def requires_no_auth(): pass +def requires_govuk_alerts_auth(): + requires_internal_auth(current_app.config.get('GOVUK_ALERTS_CLIENT_ID')) + + def requires_admin_auth(): requires_internal_auth(current_app.config.get('ADMIN_CLIENT_ID')) diff --git a/app/config.py b/app/config.py index 92502d6e3..a3977c99f 100644 --- a/app/config.py +++ b/app/config.py @@ -89,6 +89,7 @@ class Config(object): # secrets that internal apps, such as the admin app or document download, must use to authenticate with the API ADMIN_CLIENT_ID = 'notify-admin' + GOVUK_ALERTS_CLIENT_ID = 'govuk-alerts' INTERNAL_CLIENT_API_KEYS = { ADMIN_CLIENT_ID: API_INTERNAL_SECRETS @@ -406,7 +407,8 @@ class Development(Config): LETTER_SANITISE_BUCKET_NAME = 'development-letters-sanitise' INTERNAL_CLIENT_API_KEYS = { - Config.ADMIN_CLIENT_ID: ['dev-notify-secret-key'] + Config.ADMIN_CLIENT_ID: ['dev-notify-secret-key'], + Config.GOVUK_ALERTS_CLIENT_ID: ['govuk-alerts-secret-key'] } SECRET_KEY = 'dev-notify-secret-key' diff --git a/app/v2/govuk_alerts/__init__.py b/app/v2/govuk_alerts/__init__.py new file mode 100644 index 000000000..09ec8456d --- /dev/null +++ b/app/v2/govuk_alerts/__init__.py @@ -0,0 +1,11 @@ +from flask import Blueprint + +from app.v2.errors import register_errors + +v2_govuk_alerts_blueprint = Blueprint( + "v2_govuk-alerts_blueprint", + __name__, + url_prefix='/v2/govuk-alerts', +) + +register_errors(v2_govuk_alerts_blueprint) diff --git a/app/v2/govuk_alerts/get_broadcasts.py b/app/v2/govuk_alerts/get_broadcasts.py new file mode 100644 index 000000000..290b01d8d --- /dev/null +++ b/app/v2/govuk_alerts/get_broadcasts.py @@ -0,0 +1,8 @@ +from flask import jsonify + +from app.v2.govuk_alerts import v2_govuk_alerts_blueprint + + +@v2_govuk_alerts_blueprint.route('') +def get_broadcasts(): + return jsonify({}) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 50dd3fa08..44291eb43 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -78,6 +78,15 @@ def test_requires_admin_auth_should_allow_valid_token_for_request(client): assert response.status_code == 200 +def test_requires_govuk_alerts_auth_should_allow_valid_token_for_request(client): + govuk_alerts_jwt_client_id = current_app.config['GOVUK_ALERTS_CLIENT_ID'] + govuk_alerts_jwt_secret = current_app.config['INTERNAL_CLIENT_API_KEYS'][govuk_alerts_jwt_client_id][0] + govuk_alerts_jwt_token = create_jwt_token(govuk_alerts_jwt_secret, govuk_alerts_jwt_client_id) + + response = client.get('/v2/govuk-alerts', headers={'Authorization': 'Bearer {}'.format(govuk_alerts_jwt_token)}) + assert response.status_code == 200 + + def test_get_auth_token_should_not_allow_request_with_no_token(client): request.headers = {} with pytest.raises(AuthError) as exc: From a3cbea218fad12b4b134dad9f49dc486632b2569 Mon Sep 17 00:00:00 2001 From: Ben Thorner Date: Tue, 3 Aug 2021 15:36:41 +0100 Subject: [PATCH 16/16] Rename "key" to "secret" for consistency While "key" is the term used by the JWT library, all the rest of our code - the ApiKey model, the Python client - all use the term "secret" instead. Although "secret" is less precise than "key", it does help avoid confusion with (api) key (as a model object). --- tests/app/authentication/test_authentication.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 44291eb43..94e10b2fe 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -42,10 +42,10 @@ def requires_my_internal_app_auth(): requires_internal_auth('my-internal-app') -def create_custom_jwt_token(headers=None, payload=None, key=None): +def create_custom_jwt_token(headers=None, payload=None, secret=None): # code copied from notifications_python_client.authentication.py::create_jwt_token headers = headers or {"typ": 'JWT', "alg": 'HS256'} - return jwt.encode(payload=payload, key=key or str(uuid.uuid4()), headers=headers) + return jwt.encode(payload=payload, key=secret or str(uuid.uuid4()), headers=headers) @pytest.fixture @@ -154,7 +154,7 @@ def test_decode_jwt_token_should_not_allow_old_iat( ): token = create_custom_jwt_token( payload={'iss': 'something', 'iat': int(time.time()) - 60}, - key=sample_api_key.secret, + secret=sample_api_key.secret, ) with pytest.raises(AuthError) as exc: @@ -172,7 +172,7 @@ def test_decode_jwt_token_should_not_allow_extra_claims( 'iat': int(time.time()), 'aud': 'notifications.service.gov.uk' # extra claim that we don't support }, - key=sample_api_key.secret, + secret=sample_api_key.secret, ) with pytest.raises(AuthError) as exc: