diff --git a/supervisor/misc/filter.py b/supervisor/misc/filter.py index 3e5e94421..796072422 100644 --- a/supervisor/misc/filter.py +++ b/supervisor/misc/filter.py @@ -1,25 +1,42 @@ """Filter tools.""" +import ipaddress import os import re from aiohttp import hdrs import attr -from ..const import HEADER_TOKEN, HEADER_TOKEN_OLD, CoreState +from ..const import DOCKER_NETWORK_MASK, HEADER_TOKEN, HEADER_TOKEN_OLD, CoreState from ..coresys import CoreSys from ..exceptions import AddonConfigurationError RE_URL: re.Pattern = re.compile(r"(\w+:\/\/)(.*\.\w+)(.*)") +def sanitize_host(host: str) -> str: + """Return a sanitized host.""" + try: + # Allow internal URLs + ip = ipaddress.ip_address(host) + if ip in ipaddress.ip_network(DOCKER_NETWORK_MASK): + return host + except ValueError: + pass + + return "sanitized-host.invalid" + + def sanitize_url(url: str) -> str: """Return a sanitized url.""" - if not re.match(RE_URL, url): + match = re.match(RE_URL, url) + if not match: # Not a URL, just return it back return url - return re.sub(RE_URL, r"\1example.com\3", url) + host = sanitize_host(match.group(2)) + + return f"{match.group(1)}{host}{match.group(3)}" def filter_data(coresys: CoreSys, event: dict, hint: dict) -> dict: @@ -107,18 +124,18 @@ def filter_data(coresys: CoreSys, event: dict, hint: dict) -> dict: if event["request"].get("url"): event["request"]["url"] = sanitize_url(event["request"]["url"]) - for i, header in enumerate(event["request"].get("headers", [])): - key, value = header - if key == hdrs.REFERER: - event["request"]["headers"][i] = [key, sanitize_url(value)] - - if key == HEADER_TOKEN: - event["request"]["headers"][i] = [key, "XXXXXXXXXXXXXXXXXXX"] - - if key == HEADER_TOKEN_OLD: - event["request"]["headers"][i] = [key, "XXXXXXXXXXXXXXXXXXX"] - - if key in [hdrs.HOST, hdrs.X_FORWARDED_HOST]: - event["request"]["headers"][i] = [key, "example.com"] + headers = event["request"].get("headers", {}) + if hdrs.REFERER in headers: + headers[hdrs.REFERER] = sanitize_url(headers[hdrs.REFERER]) + if HEADER_TOKEN in headers: + headers[HEADER_TOKEN] = "XXXXXXXXXXXXXXXXXXX" + if HEADER_TOKEN_OLD in headers: + headers[HEADER_TOKEN_OLD] = "XXXXXXXXXXXXXXXXXXX" + if hdrs.HOST in headers: + headers[hdrs.HOST] = sanitize_host(headers[hdrs.HOST]) + if hdrs.X_FORWARDED_HOST in headers: + headers[hdrs.X_FORWARDED_HOST] = sanitize_host( + headers[hdrs.X_FORWARDED_HOST] + ) return event diff --git a/tests/misc/test_filter_data.py b/tests/misc/test_filter_data.py index 299565abb..68219e1a0 100644 --- a/tests/misc/test_filter_data.py +++ b/tests/misc/test_filter_data.py @@ -18,6 +18,60 @@ from supervisor.resolution.const import ( ) SAMPLE_EVENT = {"sample": "event", "extra": {"Test": "123"}} +SAMPLE_EVENT_AIOHTTP_INTERNAL = { + "level": "error", + "request": { + "url": "http://172.30.32.2/supervisor/options", + "query_string": "", + "method": "POST", + "env": {"REMOTE_ADDR": "172.30.32.1"}, + "headers": { + "Host": "172.30.32.2", + "User-Agent": "HomeAssistant/2025.3.0.dev202501310226 aiohttp/3.11.11 Python/3.13", + "Authorization": "[Filtered]", + "X-Hass-Source": "core.handler", + "Accept": "*/*", + "Accept-Encoding": "gzip, deflate, br", + "Content-Length": "20", + "Content-Type": "application/json", + }, + "data": '{"diagnostics":true}', + }, + "platform": "python", +} +SAMPLE_EVENT_AIOHTTP_EXTERNAL = { + "level": "error", + "request": { + "url": "http://debian-supervised-dev.lan:8123/ingress/SRtKwGqE15nF6jbzGCjkM7Nn3_uQlZ08RrJLzLJJQKc/ws", + "query_string": "", + "method": "GET", + "env": {"REMOTE_ADDR": "172.30.32.1"}, + "headers": { + "Host": "debian-supervised-dev.lan:8123", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:135.0) Gecko/20100101 Firefox/135.0", + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.5", + "Origin": "http://debian-supervised-dev.lan:8123", + "Connection": "keep-alive, Upgrade", + "Cookie": "[Filtered]", + "Pragma": "no-cache", + "Cache-Control": "no-cache", + "Upgrade": "websocket", + "X-Hass-Source": "core.ingress", + "X-Ingress-Path": "/api/hassio_ingress/SRtKwGqE15nF6jbzGCjkM7Nn3_uQlZ08RrJLzLJJQKc", + "X-Forwarded-For": "", + "X-Forwarded-Host": "debian-supervised-dev.lan:8123", + "X-Forwarded-Proto": "http", + "Sec-WebSocket-Version": "13", + "Sec-WebSocket-Key": "BD239eBT8pDIxStE6QO+Qw==", + "Sec-WebSocket-Protocol": "tty", + "Accept-Encoding": "gzip, deflate, br", + "Referer": "http://debian-supervised-dev.lan:8123/somehwere", + }, + "data": None, + }, + "platform": "python", +} @pytest.fixture @@ -81,33 +135,35 @@ def test_defaults(coresys): assert filtered["user"]["id"] == coresys.machine_id -def test_sanitize(coresys): - """Test event sanitation.""" - event = { - "request": { - "url": "https://mydomain.com", - "headers": [ - ["Host", "mydomain.com"], - ["Referer", "https://mydomain.com/api/hassio_ingress/xxx-xxx/"], - ["X-Forwarded-Host", "mydomain.com"], - ["X-Supervisor-Key", "xxx"], - ], - }, - } +def test_sanitize_user_hostname(coresys): + """Test user hostname event sanitation.""" + event = SAMPLE_EVENT_AIOHTTP_EXTERNAL coresys.config.diagnostics = True coresys.core.state = CoreState.RUNNING with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): filtered = filter_data(coresys, event, {}) - assert filtered["request"]["url"] == "https://example.com" + assert "debian-supervised-dev.lan" not in filtered["request"]["url"] - assert ["Host", "example.com"] in filtered["request"]["headers"] - assert ["Referer", "https://example.com/api/hassio_ingress/xxx-xxx/"] in filtered[ - "request" - ]["headers"] - assert ["X-Forwarded-Host", "example.com"] in filtered["request"]["headers"] - assert ["X-Supervisor-Key", "xxx"] in filtered["request"]["headers"] + assert "debian-supervised-dev.lan" not in filtered["request"]["headers"]["Host"] + assert "debian-supervised-dev.lan" not in filtered["request"]["headers"]["Referer"] + assert ( + "debian-supervised-dev.lan" + not in filtered["request"]["headers"]["X-Forwarded-Host"] + ) + + +def test_sanitize_internal(coresys): + """Test internal event sanitation.""" + event = SAMPLE_EVENT_AIOHTTP_INTERNAL + coresys.config.diagnostics = True + + coresys.core.state = CoreState.RUNNING + with patch("shutil.disk_usage", return_value=(42, 42, 2 * (1024.0**3))): + filtered = filter_data(coresys, event, {}) + + assert filtered == event def test_issues_on_report(coresys): diff --git a/tests/misc/test_sanitise_url.py b/tests/misc/test_sanitise_url.py index 67a8a8d11..6e31ab1e3 100644 --- a/tests/misc/test_sanitise_url.py +++ b/tests/misc/test_sanitise_url.py @@ -1,10 +1,22 @@ """Test supervisor.utils.sanitize_url.""" -from supervisor.misc.filter import sanitize_url +from supervisor.misc.filter import sanitize_host, sanitize_url + + +def test_sanitize_host(): + """Test supervisor.utils.sanitize_host.""" + assert sanitize_host("my.duckdns.org") == "sanitized-host.invalid" def test_sanitize_url(): """Test supervisor.utils.sanitize_url.""" assert sanitize_url("test") == "test" - assert sanitize_url("http://my.duckdns.org") == "http://example.com" - assert sanitize_url("http://my.duckdns.org/test") == "http://example.com/test" + assert sanitize_url("http://my.duckdns.org") == "http://sanitized-host.invalid" + assert ( + sanitize_url("http://my.duckdns.org/test") + == "http://sanitized-host.invalid/test" + ) + assert ( + sanitize_url("http://my.duckdns.org/test?test=123") + == "http://sanitized-host.invalid/test?test=123" + )