mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-01 07:35:34 -05:00
Simplify tests for get auth token / issuer
This switches to testing the two functions directly as trying to test them through the top-level "requires_..." functions or calls to endpoints doesn't scale as we add more of them. While this has a slight risk that a "requires_..." function might not be using these helpers, it seems unlikely and we can always add a mock to check this if we're concerned in future.
This commit is contained in:
@@ -56,19 +56,6 @@ class InternalApiKey():
|
|||||||
self.expiry_date = None
|
self.expiry_date = None
|
||||||
|
|
||||||
|
|
||||||
def get_auth_token(req):
|
|
||||||
auth_header = req.headers.get('Authorization', None)
|
|
||||||
if not auth_header:
|
|
||||||
raise AuthError('Unauthorized: authentication token must be provided', 401)
|
|
||||||
|
|
||||||
auth_scheme = auth_header[:7].title()
|
|
||||||
|
|
||||||
if auth_scheme != 'Bearer ':
|
|
||||||
raise AuthError('Unauthorized: authentication bearer scheme must be used', 401)
|
|
||||||
|
|
||||||
return auth_header[7:]
|
|
||||||
|
|
||||||
|
|
||||||
def requires_no_auth():
|
def requires_no_auth():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -82,8 +69,8 @@ def requires_internal_auth(expected_client_id):
|
|||||||
raise TypeError("Unknown client_id for internal auth")
|
raise TypeError("Unknown client_id for internal auth")
|
||||||
|
|
||||||
request_helper.check_proxy_header_before_request()
|
request_helper.check_proxy_header_before_request()
|
||||||
auth_token = get_auth_token(request)
|
auth_token = _get_auth_token(request)
|
||||||
client_id = __get_token_issuer(auth_token)
|
client_id = _get_token_issuer(auth_token)
|
||||||
|
|
||||||
if client_id != expected_client_id:
|
if client_id != expected_client_id:
|
||||||
raise AuthError("Unauthorized: not allowed to perform this action", 401)
|
raise AuthError("Unauthorized: not allowed to perform this action", 401)
|
||||||
@@ -100,8 +87,8 @@ def requires_internal_auth(expected_client_id):
|
|||||||
def requires_auth():
|
def requires_auth():
|
||||||
request_helper.check_proxy_header_before_request()
|
request_helper.check_proxy_header_before_request()
|
||||||
|
|
||||||
auth_token = get_auth_token(request)
|
auth_token = _get_auth_token(request)
|
||||||
issuer = __get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID
|
issuer = _get_token_issuer(auth_token) # ie the `iss` claim which should be a service ID
|
||||||
|
|
||||||
try:
|
try:
|
||||||
service_id = uuid.UUID(issuer)
|
service_id = uuid.UUID(issuer)
|
||||||
@@ -164,7 +151,20 @@ def _decode_jwt_token(auth_token, api_keys, service_id=None):
|
|||||||
raise AuthError("Invalid token: API key not found", 403, service_id=service_id)
|
raise AuthError("Invalid token: API key not found", 403, service_id=service_id)
|
||||||
|
|
||||||
|
|
||||||
def __get_token_issuer(auth_token):
|
def _get_auth_token(req):
|
||||||
|
auth_header = req.headers.get('Authorization', None)
|
||||||
|
if not auth_header:
|
||||||
|
raise AuthError('Unauthorized: authentication token must be provided', 401)
|
||||||
|
|
||||||
|
auth_scheme = auth_header[:7].title()
|
||||||
|
|
||||||
|
if auth_scheme != 'Bearer ':
|
||||||
|
raise AuthError('Unauthorized: authentication bearer scheme must be used', 401)
|
||||||
|
|
||||||
|
return auth_header[7:]
|
||||||
|
|
||||||
|
|
||||||
|
def _get_token_issuer(auth_token):
|
||||||
try:
|
try:
|
||||||
issuer = get_token_issuer(auth_token)
|
issuer = get_token_issuer(auth_token)
|
||||||
except TokenIssuerError:
|
except TokenIssuerError:
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ from app import api_user
|
|||||||
from app.authentication.auth import (
|
from app.authentication.auth import (
|
||||||
GENERAL_TOKEN_ERROR_MESSAGE,
|
GENERAL_TOKEN_ERROR_MESSAGE,
|
||||||
AuthError,
|
AuthError,
|
||||||
|
_get_auth_token,
|
||||||
|
_get_token_issuer,
|
||||||
requires_admin_auth,
|
requires_admin_auth,
|
||||||
requires_auth,
|
requires_auth,
|
||||||
)
|
)
|
||||||
@@ -62,39 +64,40 @@ def admin_jwt_token(admin_jwt_client_id, admin_jwt_secret):
|
|||||||
return create_jwt_token(admin_jwt_secret, admin_jwt_client_id)
|
return create_jwt_token(admin_jwt_secret, admin_jwt_client_id)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth])
|
def test_get_auth_token_should_not_allow_request_with_no_token(client):
|
||||||
def test_should_not_allow_request_with_no_token(client, auth_fn):
|
|
||||||
request.headers = {}
|
request.headers = {}
|
||||||
with pytest.raises(AuthError) as exc:
|
with pytest.raises(AuthError) as exc:
|
||||||
auth_fn()
|
_get_auth_token(request)
|
||||||
assert exc.value.short_message == 'Unauthorized: authentication token must be provided'
|
assert exc.value.short_message == 'Unauthorized: authentication token must be provided'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth])
|
def test_get_auth_token_should_not_allow_request_with_incorrect_header(client):
|
||||||
def test_should_not_allow_request_with_incorrect_header(client, auth_fn):
|
|
||||||
request.headers = {'Authorization': 'Basic 1234'}
|
request.headers = {'Authorization': 'Basic 1234'}
|
||||||
with pytest.raises(AuthError) as exc:
|
with pytest.raises(AuthError) as exc:
|
||||||
auth_fn()
|
_get_auth_token(request)
|
||||||
assert exc.value.short_message == 'Unauthorized: authentication bearer scheme must be used'
|
assert exc.value.short_message == 'Unauthorized: authentication bearer scheme must be used'
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth])
|
@pytest.mark.parametrize('scheme', ['bearer', 'Bearer'])
|
||||||
def test_should_not_allow_request_with_incorrect_token(client, auth_fn):
|
def test_get_auth_token_should_allow_valid_token(client, scheme):
|
||||||
request.headers = {'Authorization': 'Bearer 1234'}
|
token = create_jwt_token(client_id='something', secret='secret')
|
||||||
|
request.headers={'Authorization': '{} {}'.format(scheme, token)}
|
||||||
|
assert _get_auth_token(request) == token
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_token_issuer_should_not_allow_request_with_incorrect_token(client):
|
||||||
with pytest.raises(AuthError) as exc:
|
with pytest.raises(AuthError) as exc:
|
||||||
auth_fn()
|
_get_token_issuer("Bearer 1234")
|
||||||
assert exc.value.short_message == GENERAL_TOKEN_ERROR_MESSAGE
|
assert exc.value.short_message == GENERAL_TOKEN_ERROR_MESSAGE
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('auth_fn', [requires_auth, requires_admin_auth])
|
def test_get_token_issuer_should_not_allow_request_with_no_iss(client):
|
||||||
def test_should_not_allow_request_with_no_iss(client, auth_fn):
|
|
||||||
token = create_custom_jwt_token(
|
token = create_custom_jwt_token(
|
||||||
payload={'iat': int(time.time())}
|
payload={'iat': int(time.time())}
|
||||||
)
|
)
|
||||||
|
|
||||||
request.headers = {'Authorization': 'Bearer {}'.format(token)}
|
|
||||||
with pytest.raises(AuthError) as exc:
|
with pytest.raises(AuthError) as exc:
|
||||||
auth_fn()
|
_get_token_issuer(token)
|
||||||
assert exc.value.short_message == 'Invalid token: iss field not provided'
|
assert exc.value.short_message == 'Invalid token: iss field not provided'
|
||||||
|
|
||||||
|
|
||||||
@@ -186,21 +189,6 @@ def test_requires_auth_should_not_allow_invalid_secret(client, sample_api_key):
|
|||||||
assert data['message'] == {"token": ['Invalid token: API key not found']}
|
assert data['message'] == {"token": ['Invalid token: API key not found']}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('scheme', ['bearer', 'Bearer'])
|
|
||||||
def test_requires_auth_should_allow_valid_token(
|
|
||||||
client,
|
|
||||||
sample_api_key,
|
|
||||||
service_jwt_secret,
|
|
||||||
scheme,
|
|
||||||
):
|
|
||||||
token = create_jwt_token(
|
|
||||||
client_id=str(sample_api_key.service_id),
|
|
||||||
secret=service_jwt_secret,
|
|
||||||
)
|
|
||||||
response = client.get('/notifications', headers={'Authorization': '{} {}'.format(scheme, token)})
|
|
||||||
assert response.status_code == 200
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234])
|
@pytest.mark.parametrize('service_id', ['not-a-valid-id', 1234])
|
||||||
def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type(
|
def test_requires_auth_should_not_allow_service_id_with_the_wrong_data_type(
|
||||||
client,
|
client,
|
||||||
|
|||||||
Reference in New Issue
Block a user