diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index f55cb31a0..0f8d59dc3 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -13,38 +13,33 @@ from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_un from app.models import ApiKey, KEY_TYPE_NORMAL -def test_should_not_allow_request_with_no_token(notify_api): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - response = client.get('/service') - assert response.status_code == 401 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Unauthorized, authentication token must be provided']} +# Test the require_admin_auth and require_auth methods +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_request_with_no_token(client, url): + response = client.get(url) + assert response.status_code == 401 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Unauthorized, authentication token must be provided']} -def test_should_not_allow_request_with_incorrect_header(notify_api): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - response = client.get( - '/service', - headers={'Authorization': 'Basic 1234'}) - assert response.status_code == 401 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Unauthorized, authentication bearer scheme must be used']} +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_request_with_incorrect_header(client, url): + response = client.get(url, headers={'Authorization': 'Basic 1234'}) + assert response.status_code == 401 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Unauthorized, authentication bearer scheme must be used']} -def test_should_not_allow_request_with_incorrect_token(notify_api, sample_user): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - response = client.get( - '/service', - headers={'Authorization': 'Bearer 1234'}) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: signature']} +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_request_with_incorrect_token(client, url): + response = client.get(url, headers={'Authorization': 'Bearer 1234'}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Invalid token: signature']} -def test_should_not_allow_request_with_no_iss(client): +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_request_with_no_iss(client, url): # code copied from notifications_python_client.authentication.py::create_jwt_token headers = { "typ": 'JWT', @@ -58,13 +53,14 @@ def test_should_not_allow_request_with_no_iss(client): token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode() - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + response = client.get(url, headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 403 data = json.loads(response.get_data()) assert data['message'] == {"token": ['Invalid token: iss field not provided']} -def test_should_not_allow_request_with_no_iat(client, sample_api_key): +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_request_with_no_iat(client, sample_api_key, url): # code copied from notifications_python_client.authentication.py::create_jwt_token headers = { "typ": 'JWT', @@ -78,264 +74,214 @@ def test_should_not_allow_request_with_no_iat(client, sample_api_key): token = jwt.encode(payload=claims, key=str(uuid.uuid4()), headers=headers).decode() - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + response = client.get(url, headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 403 data = json.loads(response.get_data()) assert data['message'] == {"token": ['Invalid token: signature, api token is not valid']} -def test_should_not_allow_invalid_secret(notify_api, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = create_jwt_token( - secret="not-so-secret", - client_id=str(sample_api_key.service_id)) - response = client.get( - '/service', - headers={'Authorization': "Bearer {}".format(token)} - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: signature, api token is not valid']} +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_invalid_secret(client, sample_api_key, url): + token = create_jwt_token( + secret="not-so-secret", + client_id=str(sample_api_key.service_id)) + response = client.get( + url, + headers={'Authorization': "Bearer {}".format(token)} + ) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Invalid token: signature, api token is not valid']} @pytest.mark.parametrize('scheme', ['bearer', 'Bearer']) -def test_should_allow_valid_token(notify_api, sample_api_key, scheme): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = __create_get_token(sample_api_key.service_id) - response = client.get( - '/service/{}'.format(str(sample_api_key.service_id)), - headers={'Authorization': '{} {}'.format(scheme, token)} - ) - assert response.status_code == 200 +def test_should_allow_valid_token(client, sample_api_key, scheme): + token = __create_token(sample_api_key.service_id) + response = client.get('/notifications', headers={'Authorization': '{} {}'.format(scheme, token)}) + assert response.status_code == 200 -def test_should_not_allow_service_id_that_is_not_the_wrong_data_type(notify_api, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = create_jwt_token(secret=get_unsigned_secrets(sample_api_key.service_id)[0], - client_id=str('not-a-valid-id')) - response = client.get( - '/service', - headers={'Authorization': "Bearer {}".format(token)} - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} +@pytest.mark.parametrize('url', ['/service', '/notifications']) +def test_should_not_allow_service_id_that_is_not_the_wrong_data_type(client, sample_api_key, url): + token = create_jwt_token(secret=get_unsigned_secrets(sample_api_key.service_id)[0], + client_id=str('not-a-valid-id')) + response = client.get( + url, + headers={'Authorization': "Bearer {}".format(token)} + ) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Invalid token: service id is not the right data type']} -def test_should_allow_valid_token_for_request_with_path_params(notify_api, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = __create_get_token(sample_api_key.service_id) - response = client.get( - '/service/{}'.format(str(sample_api_key.service_id)), - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 +def test_should_allow_valid_token_for_request_with_path_params_for_public_url(client, sample_api_key): + token = __create_token(sample_api_key.service_id) + response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 -def test_should_allow_valid_token_when_service_has_multiple_keys(notify_api, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - data = {'service': sample_api_key.service, - 'name': 'some key name', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**data) - save_model_api_key(api_key) - token = __create_get_token(sample_api_key.service_id) - response = client.get( - '/service/{}'.format(str(sample_api_key.service_id)), - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 +def test_should_allow_valid_token_for_request_with_path_params_for_admin_url(client): + token = create_jwt_token(current_app.config['ADMIN_CLIENT_SECRET'], current_app.config['ADMIN_CLIENT_USER_NAME']) + response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 -def test_authentication_passes_admin_client_token(notify_api, - sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = create_jwt_token( - secret=current_app.config.get('ADMIN_CLIENT_SECRET'), - client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME')) - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 +def test_should_allow_valid_token_when_service_has_multiple_keys(client, sample_api_key): + data = {'service': sample_api_key.service, + 'name': 'some key name', + 'created_by': sample_api_key.created_by, + 'key_type': KEY_TYPE_NORMAL + } + api_key = ApiKey(**data) + save_model_api_key(api_key) + token = __create_token(sample_api_key.service_id) + response = client.get( + '/notifications'.format(str(sample_api_key.service_id)), + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 def test_authentication_passes_when_service_has_multiple_keys_some_expired( - notify_api, - notify_db, - notify_db_session, + client, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - expired_key_data = {'service': sample_api_key.service, - 'name': 'expired_key', - 'expiry_date': datetime.utcnow(), - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - expired_key = ApiKey(**expired_key_data) - save_model_api_key(expired_key) - another_key = {'service': sample_api_key.service, - 'name': 'another_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**another_key) - save_model_api_key(api_key) - token = create_jwt_token( - secret=get_unsigned_secret(api_key.id), - client_id=str(sample_api_key.service_id)) - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 200 + expired_key_data = {'service': sample_api_key.service, + 'name': 'expired_key', + 'expiry_date': datetime.utcnow(), + 'created_by': sample_api_key.created_by, + 'key_type': KEY_TYPE_NORMAL + } + expired_key = ApiKey(**expired_key_data) + save_model_api_key(expired_key) + another_key = {'service': sample_api_key.service, + 'name': 'another_key', + 'created_by': sample_api_key.created_by, + 'key_type': KEY_TYPE_NORMAL + } + api_key = ApiKey(**another_key) + save_model_api_key(api_key) + token = create_jwt_token( + secret=get_unsigned_secret(api_key.id), + client_id=str(sample_api_key.service_id)) + response = client.get( + '/notifications', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 -def test_authentication_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys(notify_api, - notify_db, - notify_db_session, +def test_authentication_returns_token_expired_when_service_uses_expired_key_and_has_multiple_keys(client, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - expired_key = {'service': sample_api_key.service, - 'name': 'expired_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - expired_api_key = ApiKey(**expired_key) - save_model_api_key(expired_api_key) - another_key = {'service': sample_api_key.service, - 'name': 'another_key', - 'created_by': sample_api_key.created_by, - 'key_type': KEY_TYPE_NORMAL - } - api_key = ApiKey(**another_key) - save_model_api_key(api_key) - token = create_jwt_token( - secret=get_unsigned_secret(expired_api_key.id), - client_id=str(sample_api_key.service_id)) - expire_api_key(service_id=sample_api_key.service_id, api_key_id=expired_api_key.id) - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['message'] == {"token": ['Invalid token: API key revoked']} + expired_key = {'service': sample_api_key.service, + 'name': 'expired_key', + 'created_by': sample_api_key.created_by, + 'key_type': KEY_TYPE_NORMAL + } + expired_api_key = ApiKey(**expired_key) + save_model_api_key(expired_api_key) + another_key = {'service': sample_api_key.service, + 'name': 'another_key', + 'created_by': sample_api_key.created_by, + 'key_type': KEY_TYPE_NORMAL + } + api_key = ApiKey(**another_key) + save_model_api_key(api_key) + token = create_jwt_token( + secret=get_unsigned_secret(expired_api_key.id), + client_id=str(sample_api_key.service_id)) + expire_api_key(service_id=sample_api_key.service_id, api_key_id=expired_api_key.id) + response = client.get( + '/notifications', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['message'] == {"token": ['Invalid token: API key revoked']} -def test_authentication_returns_error_when_admin_client_has_no_secrets(notify_api, - sample_service): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - api_secret = notify_api.config.get('ADMIN_CLIENT_SECRET') - token = create_jwt_token( - secret=api_secret, - client_id=notify_api.config.get('ADMIN_CLIENT_USER_NAME') - ) - notify_api.config['ADMIN_CLIENT_SECRET'] = '' - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {"token": ['Invalid token: signature']} - notify_api.config['ADMIN_CLIENT_SECRET'] = api_secret +def test_authentication_returns_error_when_admin_client_has_no_secrets(client): + api_secret = current_app.config.get('ADMIN_CLIENT_SECRET') + token = create_jwt_token( + secret=api_secret, + client_id=current_app.config.get('ADMIN_CLIENT_USER_NAME') + ) + current_app.config['ADMIN_CLIENT_SECRET'] = '' + response = client.get( + '/service', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 403 + error_message = json.loads(response.get_data()) + assert error_message['message'] == {"token": ['Invalid token: signature']} + current_app.config['ADMIN_CLIENT_SECRET'] = api_secret def test_authentication_returns_error_when_service_doesnt_exit( - notify_api, + client, sample_api_key ): - with notify_api.test_request_context(), notify_api.test_client() as client: - # get service ID and secret the wrong way around - token = create_jwt_token( - secret=str(sample_api_key.service_id), - client_id=str(sample_api_key.id)) + # get service ID and secret the wrong way around + token = create_jwt_token( + secret=str(sample_api_key.service_id), + client_id=str(sample_api_key.id)) - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)} - ) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {'token': ['Invalid token: service not found']} + response = client.get( + '/notifications', + headers={'Authorization': 'Bearer {}'.format(token)} + ) + assert response.status_code == 403 + error_message = json.loads(response.get_data()) + assert error_message['message'] == {'token': ['Invalid token: service not found']} def test_authentication_returns_error_when_service_inactive(client, sample_api_key): sample_api_key.service.active = False token = create_jwt_token(secret=str(sample_api_key.id), client_id=str(sample_api_key.service_id)) - response = client.get('/service', headers={'Authorization': 'Bearer {}'.format(token)}) + response = client.get('/notifications', headers={'Authorization': 'Bearer {}'.format(token)}) assert response.status_code == 403 error_message = json.loads(response.get_data()) assert error_message['message'] == {'token': ['Invalid token: service is archived']} -def test_authentication_returns_error_when_service_has_no_secrets(notify_api, +def test_authentication_returns_error_when_service_has_no_secrets(client, sample_service, fake_uuid): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - token = create_jwt_token( - secret=fake_uuid, - client_id=str(sample_service.id)) + token = create_jwt_token( + secret=fake_uuid, + client_id=str(sample_service.id)) - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {'token': ['Invalid token: service has no API keys']} + response = client.get( + '/notifications', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 403 + error_message = json.loads(response.get_data()) + assert error_message['message'] == {'token': ['Invalid token: service has no API keys']} def test_should_attach_the_current_api_key_to_current_app(notify_api, sample_service, sample_api_key): with notify_api.test_request_context() as context, notify_api.test_client() as client: - with pytest.raises(AttributeError): - print(api_user) - - token = __create_get_token(sample_api_key.service_id) + token = __create_token(sample_api_key.service_id) response = client.get( - '/service/{}'.format(str(sample_api_key.service_id)), + '/notifications', headers={'Authorization': 'Bearer {}'.format(token)} ) assert response.status_code == 200 assert api_user == sample_api_key -def test_should_return_403_when_token_is_expired(notify_api, +def test_should_return_403_when_token_is_expired(client, sample_api_key): - with notify_api.test_request_context(): - with notify_api.test_client() as client: - with freeze_time('2001-01-01T12:00:00'): - token = __create_get_token(sample_api_key.service_id) - with freeze_time('2001-01-01T12:00:40'): - response = client.get( - '/service', - headers={'Authorization': 'Bearer {}'.format(token)}) - assert response.status_code == 403 - error_message = json.loads(response.get_data()) - assert error_message['message'] == {'token': [ - 'Invalid token: expired, check that your system clock is accurate' - ]} + with freeze_time('2001-01-01T12:00:00'): + token = __create_token(sample_api_key.service_id) + with freeze_time('2001-01-01T12:00:40'): + response = client.get( + '/notifications', + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 403 + error_message = json.loads(response.get_data()) + assert error_message['message'] == {'token': [ + 'Invalid token: expired, check that your system clock is accurate' + ]} -def __create_get_token(service_id): - if service_id: - return create_jwt_token(secret=get_unsigned_secrets(service_id)[0], - client_id=str(service_id)) - else: - return create_jwt_token(secret=get_unsigned_secrets(service_id)[0], - client_id=service_id) - - -def __create_post_token(service_id, request_body): - return create_jwt_token( - secret=get_unsigned_secrets(service_id)[0], - client_id=str(service_id) - ) +def __create_token(service_id): + return create_jwt_token(secret=get_unsigned_secrets(service_id)[0], + client_id=str(service_id)) diff --git a/tests/app/service/test_rest.py b/tests/app/service/test_rest.py index 2b8c0b23d..c6a3b5218 100644 --- a/tests/app/service/test_rest.py +++ b/tests/app/service/test_rest.py @@ -1418,7 +1418,7 @@ def test_get_notification_billable_unit_count(client, notify_db, notify_db_sessi notification = create_sample_notification(notify_db, notify_db_session) response = client.get( '/service/{}/billable-units?year=2012'.format(notification.service_id), - headers=[create_authorization_header(service_id=notification.service_id)] + headers=[create_authorization_header()] ) assert response.status_code == 200 assert json.loads(response.get_data(as_text=True)) == { @@ -1429,7 +1429,7 @@ def test_get_notification_billable_unit_count(client, notify_db, notify_db_sessi def test_get_notification_billable_unit_count_missing_year(client, sample_service): response = client.get( '/service/{}/billable-units'.format(sample_service.id), - headers=[create_authorization_header(service_id=sample_service.id)] + headers=[create_authorization_header()] ) assert response.status_code == 400 assert json.loads(response.get_data(as_text=True)) == { @@ -1451,7 +1451,7 @@ def test_get_service_provider_aggregate_statistics( ): response = client.get( '/service/{}/fragment/aggregate_statistics{}'.format(sample_service.id, query_string), - headers=[create_authorization_header(service_id=sample_service.id)] + headers=[create_authorization_header()] ) assert response.status_code == expected_status assert json.loads(response.get_data(as_text=True)) == expected_json @@ -1496,7 +1496,7 @@ def test_get_template_stats_by_month_returns_error_for_incorrect_year( ): response = client.get( '/service/{}/notifications/templates/monthly{}'.format(sample_service.id, query_string), - headers=[create_authorization_header(service_id=sample_service.id)] + headers=[create_authorization_header()] ) assert response.status_code == expected_status assert json.loads(response.get_data(as_text=True)) == expected_json