diff --git a/app/authentication/auth.py b/app/authentication/auth.py index 5e8cb2d4e..09ccadb66 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -1,4 +1,5 @@ -from flask import request, jsonify, _request_ctx_stack, current_app +from flask import request, _request_ctx_stack, current_app +from sqlalchemy.exc import DataError from sqlalchemy.orm.exc import NoResultFound from notifications_python_client.authentication import decode_jwt_token, get_token_issuer @@ -37,8 +38,10 @@ def requires_auth(): if client == current_app.config.get('ADMIN_CLIENT_USER_NAME'): return handle_admin_key(auth_token, current_app.config.get('ADMIN_CLIENT_SECRET')) - api_keys = get_model_api_keys(client) - + try: + api_keys = get_model_api_keys(client) + except DataError: + raise AuthError("Invalid token: service id is not the right data type", 403) for api_key in api_keys: try: get_decode_errors(auth_token, api_key.unsigned_secret) @@ -59,19 +62,19 @@ def requires_auth(): if not api_keys: raise AuthError("Invalid token: service has no API keys", 403) else: - raise AuthError("Invalid token: signature", 403) + raise AuthError("Invalid token: signature, api token is not valid", 403) def handle_admin_key(auth_token, secret): try: get_decode_errors(auth_token, secret) return - except TokenDecodeError as e: + except TokenDecodeError: raise AuthError("Invalid token: signature", 403) def get_decode_errors(auth_token, unsigned_secret): try: decode_jwt_token(auth_token, unsigned_secret) - except TokenExpiredError as e: + except TokenExpiredError: raise AuthError("Invalid token: expired", 403) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 057760585..3b9ff1009 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -53,7 +53,7 @@ def test_should_not_allow_invalid_secret(notify_api, sample_api_key): ) assert response.status_code == 403 data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: signature']} + assert data['message'] == {"token": ['Invalid token: signature, api token is not valid']} def test_should_allow_valid_token(notify_api, sample_api_key): @@ -67,6 +67,20 @@ def test_should_allow_valid_token(notify_api, sample_api_key): assert response.status_code == 200 +def test_should_not_allow_service_id_that_is_not_the_wrong_data_type(notify_api, sample_api_key): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + token = create_jwt_token(secret=get_unsigned_secrets(sample_api_key.service_id)[0], + client_id=str('not-a-valid-id')) + response = client.get( + '/service', + headers={'Authorization': "Bearer {}".format(token)} + ) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} + + def test_should_allow_valid_token_for_request_with_path_params(notify_api, sample_api_key): with notify_api.test_request_context(): with notify_api.test_client() as client: @@ -95,8 +109,6 @@ def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sam def test_authentication_passes_admin_client_token(notify_api, - notify_db, - notify_db_session, sample_api_key): with notify_api.test_request_context(): with notify_api.test_client() as client: @@ -173,8 +185,7 @@ def test_authentication_returns_token_expired_when_service_uses_expired_key_and_ def test_authentication_returns_error_when_admin_client_has_no_secrets(notify_api, - notify_db, - notify_db_session): + sample_service): with notify_api.test_request_context(): with notify_api.test_client() as client: api_secret = notify_api.config.get('ADMIN_CLIENT_SECRET') @@ -194,17 +205,14 @@ def test_authentication_returns_error_when_admin_client_has_no_secrets(notify_ap def test_authentication_returns_error_when_service_doesnt_exit( notify_api, - notify_db, - notify_db_session, - sample_service, - fake_uuid + sample_api_key ): with notify_api.test_request_context(), notify_api.test_client() as client: # get service ID and secret the wrong way around token = create_jwt_token( - secret=str(sample_service.id), - client_id=fake_uuid - ) + secret=str(sample_api_key.service_id), + client_id=str(sample_api_key.id)) + response = client.get( '/service', headers={'Authorization': 'Bearer {}'.format(token)} @@ -215,8 +223,6 @@ def test_authentication_returns_error_when_service_doesnt_exit( def test_authentication_returns_error_when_service_has_no_secrets(notify_api, - notify_db, - notify_db_session, sample_service, fake_uuid): with notify_api.test_request_context(): @@ -248,8 +254,6 @@ def test_should_attach_the_current_api_key_to_current_app(notify_api, sample_ser def test_should_return_403_when_token_is_expired(notify_api, - notify_db, - notify_db_session, sample_api_key): with notify_api.test_request_context(): with notify_api.test_client() as client: