mirror of
https://github.com/GSA/notifications-api.git
synced 2026-02-04 18:31:13 -05:00
Minor change in how we inteprete Incoming IP
This commit is contained in:
@@ -1,14 +1,13 @@
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
|
||||
from flask import request, _request_ctx_stack, current_app, g
|
||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
|
||||
from sqlalchemy.exc import DataError
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from notifications_python_client.authentication import decode_jwt_token, get_token_issuer
|
||||
from notifications_python_client.errors import TokenDecodeError, TokenExpiredError, TokenIssuerError
|
||||
|
||||
from app.dao.services_dao import dao_fetch_service_by_id_with_api_keys
|
||||
|
||||
from ipaddress import IPv4Interface, ip_address
|
||||
|
||||
|
||||
class AuthError(Exception):
|
||||
def __init__(self, message, code):
|
||||
@@ -58,36 +57,38 @@ def restrict_ip_sms():
|
||||
# Counting backwards and look at the IP at the 3rd last hop - hence, hop(end-3)
|
||||
ip_route = request.headers.get("X-Forwarded-For")
|
||||
ip_list = ip_route.split(',')
|
||||
if len(ip_list) >= 3:
|
||||
inbound_ip = ip_list[len(ip_list) - 3]
|
||||
|
||||
current_app.logger.info("Inbound sms ip route list {}"
|
||||
.format(ip_route))
|
||||
if len(ip_list) >= 3:
|
||||
inbound_ip = IPv4Address(ip_list[len(ip_list) - 3])
|
||||
|
||||
# IP whitelist
|
||||
allowed_ips = current_app.config.get('SMS_INBOUND_WHITELIST')
|
||||
allowed = False
|
||||
# IP whitelist
|
||||
allowed_ips = current_app.config.get('SMS_INBOUND_WHITELIST')
|
||||
|
||||
for allowed_ip in allowed_ips:
|
||||
masked_bits = ''
|
||||
if (len(allowed_ip.split('/')) > 1):
|
||||
masked_bits = allowed_ip.split('/')[1]
|
||||
inbound_ip_str = inbound_ip + '/' + masked_bits
|
||||
if IPv4Interface(allowed_ip).network == IPv4Interface(inbound_ip_str).network:
|
||||
allowed = True
|
||||
# return
|
||||
break
|
||||
# else:
|
||||
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
||||
allowed = any(
|
||||
inbound_ip in IPv4Network(allowed_ip)
|
||||
for allowed_ip in allowed_ips
|
||||
)
|
||||
|
||||
current_app.logger.info({
|
||||
'message': 'Inbound sms ip address',
|
||||
'log_contents': {
|
||||
'passed': allowed,
|
||||
'ip_address': ip
|
||||
}
|
||||
})
|
||||
# if allowed:
|
||||
# return
|
||||
# else:
|
||||
# raise AuthError('Unknown source IP address from the SMS provider', 403)
|
||||
|
||||
return
|
||||
current_app.logger.info({
|
||||
'message': 'Inbound sms ip address',
|
||||
'log_contents': {
|
||||
'passed': allowed,
|
||||
'ip_address': ip
|
||||
}
|
||||
})
|
||||
return
|
||||
|
||||
current_app.logger.error('Traffic from unknown source or route, X-Forwarded-For="{}"'.format(
|
||||
request.headers.get("X-Forwarded-For"))
|
||||
)
|
||||
# raise AuthError('Traffic from unknown source or route', 403)
|
||||
|
||||
|
||||
def requires_admin_auth():
|
||||
|
||||
Reference in New Issue
Block a user