mirror of
https://github.com/GSA/notifications-api.git
synced 2026-01-30 06:21:50 -05:00
Use function from utils to check secret header value
This adds a before_request handler to check whether all incoming requests have the proxy header configured.
This commit is contained in:
@@ -2,17 +2,19 @@ import jwt
|
||||
import uuid
|
||||
import time
|
||||
from datetime import datetime
|
||||
from tests.conftest import set_config_values
|
||||
|
||||
import pytest
|
||||
import flask
|
||||
from flask import json, current_app
|
||||
from freezegun import freeze_time
|
||||
from notifications_python_client.authentication import create_jwt_token
|
||||
from notifications_utils.request_helper import check_proxy_header_before_request
|
||||
|
||||
from app import api_user
|
||||
from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_unsigned_secret, expire_api_key
|
||||
from app.models import ApiKey, KEY_TYPE_NORMAL
|
||||
from app.authentication.auth import restrict_ip_sms, AuthError, check_route_secret
|
||||
from app.authentication.auth import restrict_ip_sms, AuthError
|
||||
|
||||
|
||||
# Test the require_admin_auth and require_auth methods
|
||||
@@ -374,156 +376,33 @@ def test_allow_valid_ips_bits(restrict_ip_sms_app):
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def route_secret_app_with_key_1_only():
|
||||
app = flask.Flask(__name__)
|
||||
app.config['TESTING'] = True
|
||||
app.config['ROUTE_SECRET_KEY_1'] = "key_1"
|
||||
app.config['ROUTE_SECRET_KEY_2'] = ""
|
||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.0/24']
|
||||
blueprint = flask.Blueprint('route_secret_app_with_key_1_only', __name__)
|
||||
def test_route_correct_secret_key(notify_api, client):
|
||||
with set_config_values(notify_api, {
|
||||
'ROUTE_SECRET_KEY_1': 'key_1',
|
||||
'ROUTE_SECRET_KEY_2': '',
|
||||
'DEBUG': False,
|
||||
}):
|
||||
|
||||
@blueprint.route('/')
|
||||
def test_endpoint():
|
||||
return 'OK', 200
|
||||
|
||||
blueprint.before_request(check_route_secret)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
with app.test_request_context(), app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
def test_route_secret_key_1_is_used(route_secret_app_with_key_1_only):
|
||||
response = route_secret_app_with_key_1_only.get(
|
||||
path='/',
|
||||
headers=[
|
||||
('X-Custom-forwarder', 'key_1'),
|
||||
]
|
||||
)
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert resp_json['key_used'] == 1
|
||||
|
||||
|
||||
# This tests for when we do key rotation when we use both keys
|
||||
@pytest.fixture
|
||||
def route_secret_app_both_keys():
|
||||
app = flask.Flask(__name__)
|
||||
app.config['TESTING'] = True
|
||||
app.config['ROUTE_SECRET_KEY_1'] = "key_1"
|
||||
app.config['ROUTE_SECRET_KEY_2'] = "key_2"
|
||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.0/24']
|
||||
blueprint = flask.Blueprint('route_secret_app_both_keys', __name__)
|
||||
|
||||
@blueprint.route('/')
|
||||
def test_endpoint():
|
||||
return 'OK', 200
|
||||
|
||||
blueprint.before_request(check_route_secret)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
with app.test_request_context(), app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.mark.parametrize('secret_header, expected_key_used', [
|
||||
('key_2', 2),
|
||||
('key_1', 1)
|
||||
])
|
||||
def test_can_use_either_secret_route_key(route_secret_app_both_keys, secret_header, expected_key_used):
|
||||
print(secret_header)
|
||||
response = route_secret_app_both_keys.get(
|
||||
path='/',
|
||||
headers=[
|
||||
('X-Custom-forwarder', secret_header),
|
||||
]
|
||||
)
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert resp_json['key_used'] == expected_key_used
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def route_secret_app_with_key_2_only():
|
||||
app = flask.Flask(__name__)
|
||||
app.config['TESTING'] = True
|
||||
app.config['ROUTE_SECRET_KEY_1'] = ""
|
||||
app.config['ROUTE_SECRET_KEY_2'] = "key_2"
|
||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.0/24']
|
||||
blueprint = flask.Blueprint('route_secret_app_with_key_2_only', __name__)
|
||||
|
||||
@blueprint.route('/')
|
||||
def test_endpoint():
|
||||
return 'OK', 200
|
||||
|
||||
blueprint.before_request(check_route_secret)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
with app.test_request_context(), app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
def test_route_secret_key_2_is_used(route_secret_app_with_key_2_only):
|
||||
response = route_secret_app_with_key_2_only.get(
|
||||
path='/',
|
||||
headers=[
|
||||
('X-Custom-Forwarder', 'key_2'),
|
||||
]
|
||||
)
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
assert response.status_code == 200
|
||||
assert resp_json['key_used'] == 2
|
||||
|
||||
|
||||
# TODO: expected to fail because we have not implement blocking yet
|
||||
@pytest.mark.parametrize('header_name, secret', [
|
||||
pytest.mark.xfail(('some-header', 'some-value')),
|
||||
pytest.mark.xfail(('X-Custom-Forwarder', 'wrong-value')),
|
||||
])
|
||||
def test_no_route_secret_raise_403(route_secret_app_with_key_2_only, header_name, secret):
|
||||
|
||||
response = route_secret_app_with_key_2_only.get(
|
||||
path='/',
|
||||
headers=[
|
||||
(header_name, secret)
|
||||
]
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def route_secret_app_with_no_key():
|
||||
app = flask.Flask(__name__)
|
||||
app.config['TESTING'] = True
|
||||
app.config['ROUTE_SECRET_KEY_1'] = ""
|
||||
app.config['ROUTE_SECRET_KEY_2'] = ""
|
||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.0/24']
|
||||
blueprint = flask.Blueprint('route_secret_app_with_no_key', __name__)
|
||||
|
||||
@blueprint.route('/')
|
||||
def test_endpoint():
|
||||
return 'OK', 200
|
||||
|
||||
blueprint.before_request(check_route_secret)
|
||||
app.register_blueprint(blueprint)
|
||||
|
||||
with app.test_request_context(), app.test_client() as client:
|
||||
yield client
|
||||
|
||||
|
||||
# TODO: expected to fail because we have not implement blocking yet
|
||||
@pytest.mark.parametrize('secret', [
|
||||
pytest.mark.xfail('some-header')
|
||||
])
|
||||
def test_route_secret_no_key_set_should_fail(route_secret_app_with_no_key, secret):
|
||||
with pytest.raises(AuthError) as exc_info:
|
||||
response = route_secret_app_with_no_key.get(
|
||||
path='/',
|
||||
response = client.get(
|
||||
path='/_status',
|
||||
headers=[
|
||||
('X-Custom-Forwarder', 'some_value'),
|
||||
('X-Custom-forwarder', 'key_1'),
|
||||
]
|
||||
)
|
||||
resp_json = json.loads(response.get_data(as_text=True))
|
||||
exc_info.value.short_message == 'X-Custom-Forwarder, no secret was set on server'
|
||||
assert resp_json['key_used'] is None
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
def test_route_incorrect_secret_key(notify_api, client):
|
||||
with set_config_values(notify_api, {
|
||||
'ROUTE_SECRET_KEY_1': 'key_1',
|
||||
'ROUTE_SECRET_KEY_2': '',
|
||||
'DEBUG': False,
|
||||
}):
|
||||
|
||||
response = client.get(
|
||||
path='/_status',
|
||||
headers=[
|
||||
('X-Custom-forwarder', 'wrong_key'),
|
||||
]
|
||||
)
|
||||
assert response.status_code == 403
|
||||
|
||||
Reference in New Issue
Block a user