From c182ceca905df805ef4de49acd6cee6fcc3fb9bc Mon Sep 17 00:00:00 2001 From: venusbb Date: Tue, 4 Jul 2017 17:31:40 +0100 Subject: [PATCH] Check ip unit test and modify ways to parse IP address --- app/authentication/auth.py | 8 ++-- .../app/authentication/test_authentication.py | 45 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/app/authentication/auth.py b/app/authentication/auth.py index e5c9a399c..d8f45f5c6 100644 --- a/app/authentication/auth.py +++ b/app/authentication/auth.py @@ -46,10 +46,10 @@ def requires_no_auth(): def restrict_ip_sms(): ip = '' - if request.headers.getlist("X-Forwarded-For"): - ip0 = request.headers.getlist("X-Forwarded-For") - ip1 = ip0.split(',') - ip = ip1[0] + if request.headers.get("X-Forwarded-For"): + # X-Forwarded-For looks like "203.0.113.195, 70.41.3.18, 150.172.238.178" + ip_list = request.headers.get("X-Forwarded-For") + ip = ip_list.split(',')[0].strip() if ip in current_app.config.get('ALLOW_IP_INBOUND_SMS'): current_app.logger.info("Inbound sms ip addresses {} passed ".format(ip)) diff --git a/tests/app/authentication/test_authentication.py b/tests/app/authentication/test_authentication.py index 0895399ad..08c3b33e8 100644 --- a/tests/app/authentication/test_authentication.py +++ b/tests/app/authentication/test_authentication.py @@ -4,6 +4,7 @@ import time from datetime import datetime import pytest +import flask from flask import json, current_app from freezegun import freeze_time from notifications_python_client.authentication import create_jwt_token @@ -11,6 +12,7 @@ from notifications_python_client.authentication import create_jwt_token from app import api_user from app.dao.api_key_dao import get_unsigned_secrets, save_model_api_key, get_unsigned_secret, expire_api_key from app.models import ApiKey, KEY_TYPE_NORMAL +from app.authentication.auth import restrict_ip_sms, AuthError # Test the require_admin_auth and require_auth methods @@ -305,3 +307,46 @@ def test_should_return_403_when_token_is_expired(client, def __create_token(service_id): return create_jwt_token(secret=get_unsigned_secrets(service_id)[0], client_id=str(service_id)) + + +@pytest.fixture +def restrict_ip_sms_app(): + app = flask.Flask(__name__) + app.config['TESTING'] = True + app.config['ALLOW_IP_INBOUND_SMS'] = ['111.111.111.111'] + + blueprint = flask.Blueprint('restrict_ip_sms_app', __name__) + + @blueprint.route('/') + def test_endpoint(): + return 'OK', 200 + + blueprint.before_request(restrict_ip_sms) + app.register_blueprint(blueprint) + + with app.test_request_context(), app.test_client() as client: + yield client + + +def test_allow_valid_ips(restrict_ip_sms_app): + response = restrict_ip_sms_app.get( + path='/', + headers=[ + ('X-Forwarded-For', '111.111.111.111, 222.222.222.222, 127.0.0.1'), + ] + ) + + assert response.status_code == 200 + + +@pytest.mark.xfail(reason='Currently not blocking invalid IPs', strict=True) +def test_reject_invalid_ips(restrict_ip_sms_app): + with pytest.raises(AuthError) as exc_info: + restrict_ip_sms_app.get( + path='/', + headers=[ + ('X-Forwarded-For', '222.222.222.222, 111.111.111.111, 127.0.0.1') + ] + ) + + assert exc_info.value.short_message == 'Unknown source IP address from the SMS provider' \ No newline at end of file