Only use the X-Forwarded-For header if connection is from a trusted network (#15182)

See https://github.com/home-assistant/home-assistant/issues/14345#issuecomment-400854569
This commit is contained in:
Colin O'Dell 2018-06-28 09:16:11 -04:00 committed by Paulus Schoutsen
parent dbb786c548
commit 19f2bbf52f
4 changed files with 31 additions and 10 deletions

View File

@ -180,7 +180,7 @@ class HomeAssistantHTTP(object):
middlewares=[staticresource_middleware])
# This order matters
setup_real_ip(app, use_x_forwarded_for)
setup_real_ip(app, use_x_forwarded_for, trusted_networks)
if is_ban_enabled:
setup_bans(hass, app, login_threshold)

View File

@ -11,18 +11,22 @@ from .const import KEY_REAL_IP
@callback
def setup_real_ip(app, use_x_forwarded_for):
def setup_real_ip(app, use_x_forwarded_for, trusted_networks):
"""Create IP Ban middleware for the app."""
@middleware
async def real_ip_middleware(request, handler):
"""Real IP middleware."""
connected_ip = ip_address(
request.transport.get_extra_info('peername')[0])
request[KEY_REAL_IP] = connected_ip
# Only use the XFF header if enabled, present, and from a trusted proxy
if (use_x_forwarded_for and
X_FORWARDED_FOR in request.headers):
X_FORWARDED_FOR in request.headers and
any(connected_ip in trusted_network
for trusted_network in trusted_networks)):
request[KEY_REAL_IP] = ip_address(
request.headers.get(X_FORWARDED_FOR).split(',')[0])
else:
request[KEY_REAL_IP] = \
ip_address(request.transport.get_extra_info('peername')[0])
return await handler(request)

View File

@ -41,7 +41,7 @@ def app():
"""Fixture to setup a web.Application."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, False)
setup_real_ip(app, False, [])
return app

View File

@ -1,6 +1,7 @@
"""Test real IP middleware."""
from aiohttp import web
from aiohttp.hdrs import X_FORWARDED_FOR
from ipaddress import ip_network
from homeassistant.components.http.real_ip import setup_real_ip
from homeassistant.components.http.const import KEY_REAL_IP
@ -15,7 +16,7 @@ async def test_ignore_x_forwarded_for(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, False)
setup_real_ip(app, False, [])
mock_api_client = await aiohttp_client(app)
@ -27,11 +28,27 @@ async def test_ignore_x_forwarded_for(aiohttp_client):
assert text != '255.255.255.255'
async def test_use_x_forwarded_for(aiohttp_client):
async def test_use_x_forwarded_for_without_trusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, True)
setup_real_ip(app, True, [])
mock_api_client = await aiohttp_client(app)
resp = await mock_api_client.get('/', headers={
X_FORWARDED_FOR: '255.255.255.255'
})
assert resp.status == 200
text = await resp.text()
assert text != '255.255.255.255'
async def test_use_x_forwarded_for_with_trusted_proxy(aiohttp_client):
"""Test that we get the IP from the transport."""
app = web.Application()
app.router.add_get('/', mock_handler)
setup_real_ip(app, True, [ip_network('127.0.0.1')])
mock_api_client = await aiohttp_client(app)