mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 21:27:38 +00:00
Http: Change approved_ips from string to cidr validation (#3532) [BREAKING CHANGE]
* Change approved_ips from string to cidr validation Relabel to trusted_networks, better reflecting its expected inputs, everything that ipaddress.ip_networks recognizes as an ip network is possible: - 127.0.0.1 (single ipv4 addresses) - 192.168.0.0/24 (ipv4 networks) - ::1 (single ipv6 addresses) - 2001:DB8::/48 (ipv6 networks) * Add support for the X-Forwarded-For header
This commit is contained in:
parent
dc95b28487
commit
154eacef6c
@ -77,7 +77,7 @@ def setup(hass, yaml_config):
|
|||||||
ssl_certificate=None,
|
ssl_certificate=None,
|
||||||
ssl_key=None,
|
ssl_key=None,
|
||||||
cors_origins=[],
|
cors_origins=[],
|
||||||
approved_ips=[]
|
trusted_networks=[]
|
||||||
)
|
)
|
||||||
|
|
||||||
server.register_view(DescriptionXmlView(hass, config))
|
server.register_view(DescriptionXmlView(hass, config))
|
||||||
|
@ -211,8 +211,14 @@ class IndexView(HomeAssistantView):
|
|||||||
|
|
||||||
panel_url = PANELS[panel]['url'] if panel != 'states' else ''
|
panel_url = PANELS[panel]['url'] if panel != 'states' else ''
|
||||||
|
|
||||||
# auto login if no password was set
|
no_auth = 'true'
|
||||||
no_auth = 'false' if self.hass.config.api.api_password else 'true'
|
if self.hass.config.api.api_password:
|
||||||
|
# require password if set
|
||||||
|
no_auth = 'false'
|
||||||
|
if self.hass.wsgi.is_trusted_ip(
|
||||||
|
self.hass.wsgi.get_real_ip(request)):
|
||||||
|
# bypass for trusted networks
|
||||||
|
no_auth = 'true'
|
||||||
|
|
||||||
icons_url = '/static/mdi-{}.html'.format(FINGERPRINTS['mdi.html'])
|
icons_url = '/static/mdi-{}.html'.format(FINGERPRINTS['mdi.html'])
|
||||||
template = self.templates.get_template('index.html')
|
template = self.templates.get_template('index.html')
|
||||||
|
@ -11,6 +11,7 @@ import mimetypes
|
|||||||
import threading
|
import threading
|
||||||
import re
|
import re
|
||||||
import ssl
|
import ssl
|
||||||
|
from ipaddress import ip_address, ip_network
|
||||||
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
@ -30,13 +31,13 @@ DOMAIN = 'http'
|
|||||||
REQUIREMENTS = ('cherrypy==8.1.0', 'static3==0.7.0', 'Werkzeug==0.11.11')
|
REQUIREMENTS = ('cherrypy==8.1.0', 'static3==0.7.0', 'Werkzeug==0.11.11')
|
||||||
|
|
||||||
CONF_API_PASSWORD = 'api_password'
|
CONF_API_PASSWORD = 'api_password'
|
||||||
CONF_APPROVED_IPS = 'approved_ips'
|
|
||||||
CONF_SERVER_HOST = 'server_host'
|
CONF_SERVER_HOST = 'server_host'
|
||||||
CONF_SERVER_PORT = 'server_port'
|
CONF_SERVER_PORT = 'server_port'
|
||||||
CONF_DEVELOPMENT = 'development'
|
CONF_DEVELOPMENT = 'development'
|
||||||
CONF_SSL_CERTIFICATE = 'ssl_certificate'
|
CONF_SSL_CERTIFICATE = 'ssl_certificate'
|
||||||
CONF_SSL_KEY = 'ssl_key'
|
CONF_SSL_KEY = 'ssl_key'
|
||||||
CONF_CORS_ORIGINS = 'cors_allowed_origins'
|
CONF_CORS_ORIGINS = 'cors_allowed_origins'
|
||||||
|
CONF_TRUSTED_NETWORKS = 'trusted_networks'
|
||||||
|
|
||||||
DATA_API_PASSWORD = 'api_password'
|
DATA_API_PASSWORD = 'api_password'
|
||||||
NOTIFICATION_ID_LOGIN = 'http-login'
|
NOTIFICATION_ID_LOGIN = 'http-login'
|
||||||
@ -76,7 +77,8 @@ CONFIG_SCHEMA = vol.Schema({
|
|||||||
vol.Optional(CONF_SSL_CERTIFICATE): cv.isfile,
|
vol.Optional(CONF_SSL_CERTIFICATE): cv.isfile,
|
||||||
vol.Optional(CONF_SSL_KEY): cv.isfile,
|
vol.Optional(CONF_SSL_KEY): cv.isfile,
|
||||||
vol.Optional(CONF_CORS_ORIGINS): vol.All(cv.ensure_list, [cv.string]),
|
vol.Optional(CONF_CORS_ORIGINS): vol.All(cv.ensure_list, [cv.string]),
|
||||||
vol.Optional(CONF_APPROVED_IPS): vol.All(cv.ensure_list, [cv.string])
|
vol.Optional(CONF_TRUSTED_NETWORKS):
|
||||||
|
vol.All(cv.ensure_list, [ip_network])
|
||||||
}),
|
}),
|
||||||
}, extra=vol.ALLOW_EXTRA)
|
}, extra=vol.ALLOW_EXTRA)
|
||||||
|
|
||||||
@ -113,7 +115,9 @@ def setup(hass, config):
|
|||||||
ssl_certificate = conf.get(CONF_SSL_CERTIFICATE)
|
ssl_certificate = conf.get(CONF_SSL_CERTIFICATE)
|
||||||
ssl_key = conf.get(CONF_SSL_KEY)
|
ssl_key = conf.get(CONF_SSL_KEY)
|
||||||
cors_origins = conf.get(CONF_CORS_ORIGINS, [])
|
cors_origins = conf.get(CONF_CORS_ORIGINS, [])
|
||||||
approved_ips = conf.get(CONF_APPROVED_IPS, [])
|
trusted_networks = [
|
||||||
|
ip_network(trusted_network)
|
||||||
|
for trusted_network in conf.get(CONF_TRUSTED_NETWORKS, [])]
|
||||||
|
|
||||||
server = HomeAssistantWSGI(
|
server = HomeAssistantWSGI(
|
||||||
hass,
|
hass,
|
||||||
@ -124,7 +128,7 @@ def setup(hass, config):
|
|||||||
ssl_certificate=ssl_certificate,
|
ssl_certificate=ssl_certificate,
|
||||||
ssl_key=ssl_key,
|
ssl_key=ssl_key,
|
||||||
cors_origins=cors_origins,
|
cors_origins=cors_origins,
|
||||||
approved_ips=approved_ips
|
trusted_networks=trusted_networks
|
||||||
)
|
)
|
||||||
|
|
||||||
def start_wsgi_server(event):
|
def start_wsgi_server(event):
|
||||||
@ -257,7 +261,7 @@ class HomeAssistantWSGI(object):
|
|||||||
|
|
||||||
def __init__(self, hass, development, api_password, ssl_certificate,
|
def __init__(self, hass, development, api_password, ssl_certificate,
|
||||||
ssl_key, server_host, server_port, cors_origins,
|
ssl_key, server_host, server_port, cors_origins,
|
||||||
approved_ips):
|
trusted_networks):
|
||||||
"""Initilalize the WSGI Home Assistant server."""
|
"""Initilalize the WSGI Home Assistant server."""
|
||||||
from werkzeug.wrappers import Response
|
from werkzeug.wrappers import Response
|
||||||
|
|
||||||
@ -276,7 +280,7 @@ class HomeAssistantWSGI(object):
|
|||||||
self.server_host = server_host
|
self.server_host = server_host
|
||||||
self.server_port = server_port
|
self.server_port = server_port
|
||||||
self.cors_origins = cors_origins
|
self.cors_origins = cors_origins
|
||||||
self.approved_ips = approved_ips
|
self.trusted_networks = trusted_networks
|
||||||
self.event_forwarder = None
|
self.event_forwarder = None
|
||||||
self.server = None
|
self.server = None
|
||||||
|
|
||||||
@ -431,6 +435,19 @@ class HomeAssistantWSGI(object):
|
|||||||
environ['PATH_INFO'] = '{}.{}'.format(*fingerprinted.groups())
|
environ['PATH_INFO'] = '{}.{}'.format(*fingerprinted.groups())
|
||||||
return app(environ, start_response)
|
return app(environ, start_response)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_real_ip(request):
|
||||||
|
"""Return the clients correct ip address, even in proxied setups."""
|
||||||
|
if request.access_route:
|
||||||
|
return request.access_route[-1]
|
||||||
|
else:
|
||||||
|
return request.remote_addr
|
||||||
|
|
||||||
|
def is_trusted_ip(self, remote_addr):
|
||||||
|
"""Match an ip address against trusted CIDR networks."""
|
||||||
|
return any(ip_address(remote_addr) in trusted_network
|
||||||
|
for trusted_network in self.hass.wsgi.trusted_networks)
|
||||||
|
|
||||||
|
|
||||||
class HomeAssistantView(object):
|
class HomeAssistantView(object):
|
||||||
"""Base view for all views."""
|
"""Base view for all views."""
|
||||||
@ -471,13 +488,15 @@ class HomeAssistantView(object):
|
|||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise MethodNotAllowed
|
raise MethodNotAllowed
|
||||||
|
|
||||||
|
remote_addr = HomeAssistantWSGI.get_real_ip(request)
|
||||||
|
|
||||||
# Auth code verbose on purpose
|
# Auth code verbose on purpose
|
||||||
authenticated = False
|
authenticated = False
|
||||||
|
|
||||||
if self.hass.wsgi.api_password is None:
|
if self.hass.wsgi.api_password is None:
|
||||||
authenticated = True
|
authenticated = True
|
||||||
|
|
||||||
elif request.remote_addr in self.hass.wsgi.approved_ips:
|
elif self.hass.wsgi.is_trusted_ip(remote_addr):
|
||||||
authenticated = True
|
authenticated = True
|
||||||
|
|
||||||
elif hmac.compare_digest(request.headers.get(HTTP_HEADER_HA_AUTH, ''),
|
elif hmac.compare_digest(request.headers.get(HTTP_HEADER_HA_AUTH, ''),
|
||||||
@ -491,17 +510,17 @@ class HomeAssistantView(object):
|
|||||||
|
|
||||||
if self.requires_auth and not authenticated:
|
if self.requires_auth and not authenticated:
|
||||||
_LOGGER.warning('Login attempt or request with an invalid '
|
_LOGGER.warning('Login attempt or request with an invalid '
|
||||||
'password from %s', request.remote_addr)
|
'password from %s', remote_addr)
|
||||||
persistent_notification.create(
|
persistent_notification.create(
|
||||||
self.hass,
|
self.hass,
|
||||||
'Invalid password used from {}'.format(request.remote_addr),
|
'Invalid password used from {}'.format(remote_addr),
|
||||||
'Login attempt failed', NOTIFICATION_ID_LOGIN)
|
'Login attempt failed', NOTIFICATION_ID_LOGIN)
|
||||||
raise Unauthorized()
|
raise Unauthorized()
|
||||||
|
|
||||||
request.authenticated = authenticated
|
request.authenticated = authenticated
|
||||||
|
|
||||||
_LOGGER.info('Serving %s to %s (auth: %s)',
|
_LOGGER.info('Serving %s to %s (auth: %s)',
|
||||||
request.path, request.remote_addr, authenticated)
|
request.path, remote_addr, authenticated)
|
||||||
|
|
||||||
result = handler(request, **values)
|
result = handler(request, **values)
|
||||||
|
|
||||||
|
@ -2,6 +2,8 @@
|
|||||||
# pylint: disable=protected-access,too-many-public-methods
|
# pylint: disable=protected-access,too-many-public-methods
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
from ipaddress import ip_network
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
@ -18,6 +20,11 @@ HA_HEADERS = {
|
|||||||
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
|
const.HTTP_HEADER_HA_AUTH: API_PASSWORD,
|
||||||
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
|
const.HTTP_HEADER_CONTENT_TYPE: const.CONTENT_TYPE_JSON,
|
||||||
}
|
}
|
||||||
|
# dont' add 127.0.0.1/::1 as trusted, as it may interfere with other test cases
|
||||||
|
TRUSTED_NETWORKS = ["192.0.2.0/24",
|
||||||
|
"2001:DB8:ABCD::/48",
|
||||||
|
'100.64.0.1',
|
||||||
|
'FD01:DB8::1']
|
||||||
|
|
||||||
CORS_ORIGINS = [HTTP_BASE_URL, HTTP_BASE]
|
CORS_ORIGINS = [HTTP_BASE_URL, HTTP_BASE]
|
||||||
|
|
||||||
@ -46,6 +53,10 @@ def setUpModule(): # pylint: disable=invalid-name
|
|||||||
|
|
||||||
bootstrap.setup_component(hass, 'api')
|
bootstrap.setup_component(hass, 'api')
|
||||||
|
|
||||||
|
hass.wsgi.trusted_networks = [
|
||||||
|
ip_network(trusted_network)
|
||||||
|
for trusted_network in TRUSTED_NETWORKS]
|
||||||
|
|
||||||
hass.start()
|
hass.start()
|
||||||
time.sleep(0.05)
|
time.sleep(0.05)
|
||||||
|
|
||||||
@ -72,14 +83,21 @@ class TestHttp:
|
|||||||
|
|
||||||
assert req.status_code == 401
|
assert req.status_code == 401
|
||||||
|
|
||||||
def test_access_denied_with_ip_no_in_approved_ips(self, caplog):
|
def test_access_denied_with_untrusted_ip(self, caplog):
|
||||||
"""Test access deniend with ip not in approved ip."""
|
"""Test access with an untrusted ip address."""
|
||||||
hass.wsgi.approved_ips = ['134.4.56.1']
|
|
||||||
|
|
||||||
req = requests.get(_url(const.URL_API),
|
for remote_addr in ['198.51.100.1',
|
||||||
params={'api_password': ''})
|
'2001:DB8:FA1::1',
|
||||||
|
'127.0.0.1',
|
||||||
|
'::1']:
|
||||||
|
with patch('homeassistant.components.http.'
|
||||||
|
'HomeAssistantWSGI.get_real_ip',
|
||||||
|
return_value=remote_addr):
|
||||||
|
req = requests.get(_url(const.URL_API),
|
||||||
|
params={'api_password': ''})
|
||||||
|
|
||||||
assert req.status_code == 401
|
assert req.status_code == 401, \
|
||||||
|
"{} shouldn't be trusted".format(remote_addr)
|
||||||
|
|
||||||
def test_access_with_password_in_header(self, caplog):
|
def test_access_with_password_in_header(self, caplog):
|
||||||
"""Test access with password in URL."""
|
"""Test access with password in URL."""
|
||||||
@ -121,14 +139,20 @@ class TestHttp:
|
|||||||
# assert const.URL_API in logs
|
# assert const.URL_API in logs
|
||||||
assert API_PASSWORD not in logs
|
assert API_PASSWORD not in logs
|
||||||
|
|
||||||
def test_access_with_ip_in_approved_ips(self, caplog):
|
def test_access_with_trusted_ip(self, caplog):
|
||||||
"""Test access with approved ip."""
|
"""Test access with trusted addresses."""
|
||||||
hass.wsgi.approved_ips = ['127.0.0.1', '134.4.56.1']
|
for remote_addr in ['100.64.0.1',
|
||||||
|
'192.0.2.100',
|
||||||
|
'FD01:DB8::1',
|
||||||
|
'2001:DB8:ABCD::1']:
|
||||||
|
with patch('homeassistant.components.http.'
|
||||||
|
'HomeAssistantWSGI.get_real_ip',
|
||||||
|
return_value=remote_addr):
|
||||||
|
req = requests.get(_url(const.URL_API),
|
||||||
|
params={'api_password': ''})
|
||||||
|
|
||||||
req = requests.get(_url(const.URL_API),
|
assert req.status_code == 200, \
|
||||||
params={'api_password': ''})
|
"{} should be trusted".format(remote_addr)
|
||||||
|
|
||||||
assert req.status_code == 200
|
|
||||||
|
|
||||||
def test_cors_allowed_with_password_in_url(self):
|
def test_cors_allowed_with_password_in_url(self):
|
||||||
"""Test cross origin resource sharing with password in url."""
|
"""Test cross origin resource sharing with password in url."""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user