From 5f59b631e16ddb11791245ad73d50d2f681e5931 Mon Sep 17 00:00:00 2001 From: Rebecca Law Date: Thu, 14 Jan 2016 17:45:30 +0000 Subject: [PATCH] Added the before_request so that all calls must have a valid token. Next is to get all the rest tests to pass again. --- app/__init__.py | 7 + app/status/healthcheck.py | 2 +- tests/app/authentication/__init__.py | 0 .../app/authentication/test_authentication.py | 122 +++++++++++++++ tests/app/main/views/test_authentication.py | 141 ------------------ 5 files changed, 130 insertions(+), 142 deletions(-) create mode 100644 tests/app/authentication/__init__.py create mode 100644 tests/app/authentication/test_authentication.py delete mode 100644 tests/app/main/views/test_authentication.py diff --git a/app/__init__.py b/app/__init__.py index 7607fc95c..545ff9b91 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -46,6 +46,13 @@ def init_app(app): if key in os.environ: app.config[key] = convert_to_boolean(os.environ[key]) + @app.before_request + def required_authentication(): + from app.authentication import auth + error = auth.requires_auth() + if error: + return error + def convert_to_boolean(value): """Turn strings to bools if they look like them diff --git a/app/status/healthcheck.py b/app/status/healthcheck.py index 1ab87e0db..0407cabc4 100644 --- a/app/status/healthcheck.py +++ b/app/status/healthcheck.py @@ -4,7 +4,7 @@ from flask import Blueprint status = Blueprint('status', __name__) -@status.route('/_status') +@status.route('/_status', methods=['GET', 'POST']) def show_status(): return jsonify( status="ok", diff --git a/tests/app/authentication/__init__.py b/tests/app/authentication/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py new file mode 100644 index 000000000..bbcdf07c0 --- /dev/null +++ b/tests/app/authentication/test_authentication.py @@ -0,0 +1,122 @@ +import pytest +from flask import json, url_for +from client.authentication import create_jwt_token + + +def test_should_not_allow_request_with_no_token(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + response = client.get(url_for('status.show_status')) + assert response.status_code == 401 + data = json.loads(response.get_data()) + assert data['error'] == 'Unauthorized, authentication token must be provided' + + +def test_should_not_allow_request_with_incorrect_header(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + response = client.get(url_for('status.show_status'), + headers={'Authorization': 'Basic 1234'}) + assert response.status_code == 401 + data = json.loads(response.get_data()) + assert data['error'] == 'Unauthorized, authentication bearer scheme must be used' + + +def test_should_not_allow_request_with_incorrect_token(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + response = client.get(url_for('status.show_status'), + headers={'Authorization': 'Bearer 1234'}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['error'] == 'Invalid token: signature' + + +def test_should_not_allow_incorrect_path(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + token = create_jwt_token(request_method="GET", request_path="/bad", secret="secret", client_id="client_id") + response = client.get(url_for('status.show_status'), + headers={'Authorization': "Bearer {}".format(token)}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['error'] == 'Invalid token: request' + + +def test_should_not_allow_incorrect_method(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + token = create_jwt_token(request_method="POST", request_path="/", secret="secret", client_id="client_id") + response = client.get(url_for('status.show_status'), + headers={'Authorization': "Bearer {}".format(token)}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['error'] == 'Invalid token: request' + + +def test_should_not_allow_invalid_secret(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret", + client_id="client_id") + response = client.get(url_for('status.show_status'), + headers={'Authorization': "Bearer {}".format(token)}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['error'] == 'Invalid token: signature' + + +def test_should_allow_valid_token(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + token = create_jwt_token(request_method="GET", + request_path=url_for('status.show_status'), + secret="secret", + client_id="client_id") + response = client.get(url_for('status.show_status'), + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + +def test_should_allow_valid_token_with_post_body(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + json_body = json.dumps({ + "key1": "value1", + "key2": "value2", + "key3": "value3" + }) + token = create_jwt_token( + request_method="POST", + request_path=url_for('status.show_status'), + secret="secret", + client_id="client_id", + request_body=json_body + ) + response = client.post(url_for('status.show_status'), + data=json_body, + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 200 + + +def test_should_not_allow_valid_token_with_invalid_post_body(notify_api): + with notify_api.test_request_context(): + with notify_api.test_client() as client: + json_body = json.dumps({ + "key1": "value1", + "key2": "value2", + "key3": "value3" + }) + token = create_jwt_token( + request_method="POST", + request_path=url_for('status.show_status'), + secret="secret", + client_id="client_id", + request_body=json_body + ) + response = client.post(url_for('status.show_status'), + data="spurious", + headers={'Authorization': 'Bearer {}'.format(token)}) + assert response.status_code == 403 + data = json.loads(response.get_data()) + assert data['error'] == 'Invalid token: payload' diff --git a/tests/app/main/views/test_authentication.py b/tests/app/main/views/test_authentication.py deleted file mode 100644 index cad52d6a6..000000000 --- a/tests/app/main/views/test_authentication.py +++ /dev/null @@ -1,141 +0,0 @@ -import pytest -from flask import json -from client.authentication import create_jwt_token - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_request_with_no_token(notify_api): - response = notify_api.test_client().get("/") - assert response.status_code == 401 - data = json.loads(response.get_data()) - assert data['error'] == 'Unauthorized, authentication token must be provided' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_request_with_incorrect_header(notify_api): - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': 'Basic 1234' - } - ) - assert response.status_code == 401 - data = json.loads(response.get_data()) - assert data['error'] == 'Unauthorized, authentication bearer scheme must be used' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_request_with_incorrect_token(notify_api): - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': 'Bearer 1234' - } - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['error'] == 'Invalid token: signature' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_incorrect_path(notify_api): - token = create_jwt_token(request_method="GET", request_path="/bad", secret="secret", client_id="client_id") - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': "Bearer {}".format(token) - } - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['error'] == 'Invalid token: request' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_incorrect_method(notify_api): - token = create_jwt_token(request_method="POST", request_path="/", secret="secret", client_id="client_id") - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': "Bearer {}".format(token) - } - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['error'] == 'Invalid token: request' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_invalid_secret(notify_api): - token = create_jwt_token(request_method="POST", request_path="/", secret="not-so-secret", client_id="client_id") - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': "Bearer {}".format(token) - } - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['error'] == 'Invalid token: signature' - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_allow_valid_token(notify_api): - token = create_jwt_token(request_method="GET", request_path="/", secret="secret", client_id="client_id") - response = notify_api.test_client().get( - "/", - headers={ - 'Authorization': 'Bearer {}'.format(token) - } - ) - assert response.status_code == 200 - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_allow_valid_token_with_post_body(notify_api): - json_body = json.dumps({ - "key1": "value1", - "key2": "value2", - "key3": "value3" - }) - token = create_jwt_token( - request_method="POST", - request_path="/", - secret="secret", - client_id="client_id", - request_body=json_body - ) - response = notify_api.test_client().post( - "/", - data=json_body, - headers={ - 'Authorization': 'Bearer {}'.format(token) - } - ) - assert response.status_code == 200 - - -@pytest.mark.xfail(reason="Authentication to be added.") -def test_should_not_allow_valid_token_with_invalid_post_body(notify_api): - json_body = json.dumps({ - "key1": "value1", - "key2": "value2", - "key3": "value3" - }) - token = create_jwt_token( - request_method="POST", - request_path="/", - secret="secret", - client_id="client_id", - request_body=json_body - ) - response = notify_api.test_client().post( - "/", - data="spurious", - headers={ - 'Authorization': 'Bearer {}'.format(token) - } - ) - assert response.status_code == 403 - data = json.loads(response.get_data()) - assert data['error'] == 'Invalid token: payload'