mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-03 01:41:05 -05:00
Use ipaddress library for the masked bits
This commit is contained in:
@@ -7,6 +7,8 @@ from notifications_python_client.errors import TokenDecodeError, TokenExpiredErr
|
|||||||
|
|
||||||
from app.dao.services_dao import dao_fetch_service_by_id_with_api_keys
|
from app.dao.services_dao import dao_fetch_service_by_id_with_api_keys
|
||||||
|
|
||||||
|
from ipaddress import IPv4Interface, ip_address
|
||||||
|
|
||||||
|
|
||||||
class AuthError(Exception):
|
class AuthError(Exception):
|
||||||
def __init__(self, message, code):
|
def __init__(self, message, code):
|
||||||
@@ -57,20 +59,23 @@ def restrict_ip_sms():
|
|||||||
ip_route = request.headers.get("X-Forwarded-For")
|
ip_route = request.headers.get("X-Forwarded-For")
|
||||||
ip_list = ip_route.split(',')
|
ip_list = ip_route.split(',')
|
||||||
if len(ip_list) >= 3:
|
if len(ip_list) >= 3:
|
||||||
ip = ip_list[len(ip_list) - 3]
|
inbound_ip = ip_list[len(ip_list) - 3]
|
||||||
current_app.logger.info("Inbound sms ip route list {}"
|
current_app.logger.info("Inbound sms ip route list {}"
|
||||||
.format(ip_route))
|
.format(ip_route))
|
||||||
p0 = ip.split('.')
|
|
||||||
|
|
||||||
# IP whitelist
|
# IP whitelist
|
||||||
allowed_ips = current_app.config.get('SMS_INBOUND_WHITELIST')
|
allowed_ips = current_app.config.get('SMS_INBOUND_WHITELIST')
|
||||||
allowed = False
|
allowed = False
|
||||||
|
|
||||||
for allowed_ip in allowed_ips:
|
for allowed_ip in allowed_ips:
|
||||||
p1 = allowed_ip.split('.')
|
masked_bits = ''
|
||||||
if p0[0] == p1[0] and p0[1] == p1[1] and p0[2] == p1[2]:
|
if (len(allowed_ip.split('/')) > 1):
|
||||||
|
masked_bits = allowed_ip.split('/')[1]
|
||||||
|
inbound_ip_str = inbound_ip + '/' + masked_bits
|
||||||
|
if IPv4Interface(allowed_ip).network == IPv4Interface(inbound_ip_str).network:
|
||||||
allowed = True
|
allowed = True
|
||||||
# return
|
# return
|
||||||
|
break
|
||||||
# else:
|
# else:
|
||||||
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
||||||
|
|
||||||
|
|||||||
@@ -313,7 +313,7 @@ def __create_token(service_id):
|
|||||||
def restrict_ip_sms_app():
|
def restrict_ip_sms_app():
|
||||||
app = flask.Flask(__name__)
|
app = flask.Flask(__name__)
|
||||||
app.config['TESTING'] = True
|
app.config['TESTING'] = True
|
||||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111', '100.100.100.100']
|
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.200/24']
|
||||||
blueprint = flask.Blueprint('restrict_ip_sms_app', __name__)
|
blueprint = flask.Blueprint('restrict_ip_sms_app', __name__)
|
||||||
|
|
||||||
@blueprint.route('/')
|
@blueprint.route('/')
|
||||||
@@ -369,7 +369,7 @@ def test_allow_valid_ips_24bits(restrict_ip_sms_app):
|
|||||||
response = restrict_ip_sms_app.get(
|
response = restrict_ip_sms_app.get(
|
||||||
path='/',
|
path='/',
|
||||||
headers=[
|
headers=[
|
||||||
('X-Forwarded-For', '111.111.111.119, 222.222.222.222, 127.0.0.1'),
|
('X-Forwarded-For', '200.200.200.222, 222.222.222.222, 127.0.0.1'),
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user