X-Forwarded-For improvements and bug fixes (#15204)

* Use new trusted_proxies setting for X-Forwarded-For whitelist

* Only use the last IP in the header

Per Wikipedia (https://en.wikipedia.org/wiki/X-Forwarded-For#Format):

 > The last IP address is always the IP address that connects to the last proxy,
 > which means it is the most reliable source of information.

* Add two additional tests

* Ignore nonsense header values instead of failing
This commit is contained in:
Colin O'Dell 2018-06-29 16:27:06 -04:00 committed by Paulus Schoutsen
parent c61a652c90
commit fd38caa287
5 changed files with 67 additions and 9 deletions

View File

@ -95,6 +95,7 @@ def setup(hass, yaml_config):
ssl_key=None,
cors_origins=None,
use_x_forwarded_for=False,
trusted_proxies=[],
trusted_networks=[],
login_threshold=0,
is_ban_enabled=False

View File

@ -44,6 +44,7 @@ CONF_SSL_PEER_CERTIFICATE = 'ssl_peer_certificate'
CONF_SSL_KEY = 'ssl_key'
CONF_CORS_ORIGINS = 'cors_allowed_origins'
CONF_USE_X_FORWARDED_FOR = 'use_x_forwarded_for'
CONF_TRUSTED_PROXIES = 'trusted_proxies'
CONF_TRUSTED_NETWORKS = 'trusted_networks'
CONF_LOGIN_ATTEMPTS_THRESHOLD = 'login_attempts_threshold'
CONF_IP_BAN_ENABLED = 'ip_ban_enabled'
@ -86,6 +87,8 @@ HTTP_SCHEMA = vol.Schema({
vol.Optional(CONF_CORS_ORIGINS, default=[]):
vol.All(cv.ensure_list, [cv.string]),
vol.Optional(CONF_USE_X_FORWARDED_FOR, default=False): cv.boolean,
vol.Optional(CONF_TRUSTED_PROXIES, default=[]):
vol.All(cv.ensure_list, [ip_network]),
vol.Optional(CONF_TRUSTED_NETWORKS, default=[]):
vol.All(cv.ensure_list, [ip_network]),
vol.Optional(CONF_LOGIN_ATTEMPTS_THRESHOLD,
@ -114,6 +117,7 @@ async def async_setup(hass, config):
ssl_key = conf.get(CONF_SSL_KEY)
cors_origins = conf[CONF_CORS_ORIGINS]
use_x_forwarded_for = conf[CONF_USE_X_FORWARDED_FOR]
trusted_proxies = conf[CONF_TRUSTED_PROXIES]
trusted_networks = conf[CONF_TRUSTED_NETWORKS]
is_ban_enabled = conf[CONF_IP_BAN_ENABLED]
login_threshold = conf[CONF_LOGIN_ATTEMPTS_THRESHOLD]
@ -132,6 +136,7 @@ async def async_setup(hass, config):
ssl_key=ssl_key,
cors_origins=cors_origins,
use_x_forwarded_for=use_x_forwarded_for,
trusted_proxies=trusted_proxies,
trusted_networks=trusted_networks,
login_threshold=login_threshold,
is_ban_enabled=is_ban_enabled
@ -173,14 +178,14 @@ class HomeAssistantHTTP(object):
def __init__(self, hass, api_password,
ssl_certificate, ssl_peer_certificate,
ssl_key, server_host, server_port, cors_origins,
use_x_forwarded_for, trusted_networks,
use_x_forwarded_for, trusted_proxies, trusted_networks,
login_threshold, is_ban_enabled):
"""Initialize the HTTP Home Assistant server."""
app = self.app = web.Application(
middlewares=[staticresource_middleware])
# This order matters
setup_real_ip(app, use_x_forwarded_for, trusted_networks)
setup_real_ip(app, use_x_forwarded_for, trusted_proxies)
if is_ban_enabled:
setup_bans(hass, app, login_threshold)

View File

@ -11,7 +11,7 @@ from .const import KEY_REAL_IP
@callback
def setup_real_ip(app, use_x_forwarded_for, trusted_networks):
def setup_real_ip(app, use_x_forwarded_for, trusted_proxies):
"""Create IP Ban middleware for the app."""
@middleware
async def real_ip_middleware(request, handler):
@ -21,12 +21,15 @@ def setup_real_ip(app, use_x_forwarded_for, trusted_networks):
request[KEY_REAL_IP] = connected_ip
# Only use the XFF header if enabled, present, and from a trusted proxy
if (use_x_forwarded_for and
X_FORWARDED_FOR in request.headers and
any(connected_ip in trusted_network
for trusted_network in trusted_networks)):
request[KEY_REAL_IP] = ip_address(
request.headers.get(X_FORWARDED_FOR).split(',')[0])
try:
if (use_x_forwarded_for and
X_FORWARDED_FOR in request.headers and
any(connected_ip in trusted_proxy
for trusted_proxy in trusted_proxies)):
request[KEY_REAL_IP] = ip_address(
request.headers.get(X_FORWARDED_FOR).split(', ')[-1])
except ValueError:
pass
return await handler(request)

View File

@ -58,3 +58,51 @@ async def test_use_x_forwarded_for_with_trusted_proxy(aiohttp_client):
assert resp.status == 200
text = await resp.text()
assert text == '255.255.255.255'
async def test_use_x_forwarded_for_with_untrusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, True, [ip_network('1.1.1.1')])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get('/', headers={
X_FORWARDED_FOR: '255.255.255.255'
})
assert resp.status == 200
text = await resp.text()
assert text != '255.255.255.255'
async def test_use_x_forwarded_for_with_spoofed_header(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, True, [ip_network('127.0.0.1')])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get('/', headers={
X_FORWARDED_FOR: '222.222.222.222, 255.255.255.255'
})
assert resp.status == 200
text = await resp.text()
assert text == '255.255.255.255'
async def test_use_x_forwarded_for_with_nonsense_header(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, True, [ip_network('127.0.0.1')])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get('/', headers={
X_FORWARDED_FOR: 'This value is invalid'
})
assert resp.status == 200
text = await resp.text()
assert text == '127.0.0.1'

View File

@ -160,6 +160,7 @@ class TestCheckConfig(unittest.TestCase):
'server_host': '0.0.0.0',
'server_port': 8123,
'trusted_networks': [],
'trusted_proxies': [],
'use_x_forwarded_for': False}
assert res['secret_cache'] == {secrets_path: {'http_pw': 'abc123'}}
assert res['secrets'] == {'http_pw': 'abc123'}