mirror of
https://github.com/GSA/notifications-api.git
synced 2025-12-23 08:51:30 -05:00
Merge branch 'master' into add-multiple-reply-to-email-endpoints
This commit is contained in:
@@ -1,9 +1,10 @@
|
|||||||
from flask import request, _request_ctx_stack, current_app, g
|
from ipaddress import IPv4Address, IPv4Network
|
||||||
from sqlalchemy.exc import DataError
|
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
|
||||||
|
|
||||||
|
from flask import request, _request_ctx_stack, current_app, g
|
||||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||||
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
|
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
|
||||||
|
from sqlalchemy.exc import DataError
|
||||||
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
|
|
||||||
from app.dao.services_dao import dao_fetch_service_by_id_with_api_keys
|
from app.dao.services_dao import dao_fetch_service_by_id_with_api_keys
|
||||||
|
|
||||||
@@ -44,31 +45,50 @@ def requires_no_auth():
|
|||||||
|
|
||||||
|
|
||||||
def restrict_ip_sms():
|
def restrict_ip_sms():
|
||||||
|
# Check route of inbound sms (Experimental)
|
||||||
|
# Temporary custom header for route security
|
||||||
|
if request.headers.get("X-Custom-forwarder"):
|
||||||
|
current_app.logger.info("X-Custom-forwarder {}".format(request.headers.get("X-Custom-forwarder")))
|
||||||
|
|
||||||
|
# Check IP of SMS providers
|
||||||
ip = ''
|
ip = ''
|
||||||
if request.headers.get("X-Forwarded-For"):
|
if request.headers.get("X-Forwarded-For"):
|
||||||
# X-Forwarded-For looks like "203.0.113.195, 70.41.3.18, 150.172.238.178"
|
# X-Forwarded-For looks like "203.0.113.195, 70.41.3.18, 150.172.238.178"
|
||||||
# Counting backwards and look at the IP at the 3rd last hop - hence, hop(end-3)
|
# Counting backwards and look at the IP at the 3rd last hop - hence, hop(end-3)
|
||||||
ip_route = request.headers.get("X-Forwarded-For")
|
ip_route = request.headers.get("X-Forwarded-For")
|
||||||
ip_list = ip_route.split(',')
|
ip_list = ip_route.split(',')
|
||||||
if len(ip_list) >= 3:
|
|
||||||
ip = ip_list[len(ip_list) - 3]
|
|
||||||
current_app.logger.info("Inbound sms ip route list {}"
|
current_app.logger.info("Inbound sms ip route list {}"
|
||||||
.format(ip_route))
|
.format(ip_route))
|
||||||
|
if len(ip_list) >= 3:
|
||||||
|
inbound_ip = IPv4Address(ip_list[len(ip_list) - 3])
|
||||||
|
|
||||||
# Temporary custom header for route security - to experiment if the header passes through
|
# IP whitelist
|
||||||
if request.headers.get("X-Custom-forwarder"):
|
allowed_ips = current_app.config.get('SMS_INBOUND_WHITELIST')
|
||||||
current_app.logger.info("X-Custom-forwarder {}".format(request.headers.get("X-Custom-forwarder")))
|
|
||||||
|
|
||||||
current_app.logger.info({
|
allowed = any(
|
||||||
'message': 'Inbound sms ip address',
|
inbound_ip in IPv4Network(allowed_ip)
|
||||||
'log_contents': {
|
for allowed_ip in allowed_ips
|
||||||
'passed': ip in current_app.config.get('SMS_INBOUND_WHITELIST'),
|
)
|
||||||
'ip_address': ip
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
return
|
# if allowed:
|
||||||
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
# return
|
||||||
|
# else:
|
||||||
|
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
||||||
|
|
||||||
|
current_app.logger.info({
|
||||||
|
'message': 'Inbound sms ip address',
|
||||||
|
'log_contents': {
|
||||||
|
'passed': allowed,
|
||||||
|
'ip_address': ip
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return
|
||||||
|
|
||||||
|
current_app.logger.error('Traffic from unknown source or route, X-Forwarded-For="{}"'.format(
|
||||||
|
request.headers.get("X-Forwarded-For"))
|
||||||
|
)
|
||||||
|
# raise AuthError('Traffic from unknown source or route', 403)
|
||||||
|
|
||||||
|
|
||||||
def requires_admin_auth():
|
def requires_admin_auth():
|
||||||
|
|||||||
@@ -313,7 +313,7 @@ def __create_token(service_id):
|
|||||||
def restrict_ip_sms_app():
|
def restrict_ip_sms_app():
|
||||||
app = flask.Flask(__name__)
|
app = flask.Flask(__name__)
|
||||||
app.config['TESTING'] = True
|
app.config['TESTING'] = True
|
||||||
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111', '100.100.100.100']
|
app.config['SMS_INBOUND_WHITELIST'] = ['111.111.111.111/32', '200.200.200.0/24']
|
||||||
blueprint = flask.Blueprint('restrict_ip_sms_app', __name__)
|
blueprint = flask.Blueprint('restrict_ip_sms_app', __name__)
|
||||||
|
|
||||||
@blueprint.route('/')
|
@blueprint.route('/')
|
||||||
@@ -362,3 +362,15 @@ def test_illegitimate_ips(restrict_ip_sms_app):
|
|||||||
)
|
)
|
||||||
|
|
||||||
assert exc_info.value.short_message == 'Unknown IP route not from known SMS provider'
|
assert exc_info.value.short_message == 'Unknown IP route not from known SMS provider'
|
||||||
|
|
||||||
|
|
||||||
|
def test_allow_valid_ips_bits(restrict_ip_sms_app):
|
||||||
|
# Test an address that match the first 24 bits only
|
||||||
|
response = restrict_ip_sms_app.get(
|
||||||
|
path='/',
|
||||||
|
headers=[
|
||||||
|
('X-Forwarded-For', '200.200.200.222, 222.222.222.222, 127.0.0.1'),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
|||||||
Reference in New Issue
Block a user