supervisor/tests/api/middleware/test_security.py
Stefan Agner 696dcf6149
Initialize Supervisor Core state in constructor (#5686)
* Initialize Supervisor Core state in constructor

Make sure the Supervisor Core state is set to a value early on. This
makes sure that the state is always of type CoreState, and makes sure
that any use of the state can rely on it being an actual value from the
CoreState enum.

This fixes Sentry filter during early startup, where the state
previously was None. Because of that, the Sentry filter tried to
collect more Context, which lead to an exception and not reporting
errors.

* Fix pytest

It seems that with initializing the state early, the pytest actually
runs a system evaluation with:
Starting system evaluation with state initialize

Before it did that with:
Starting system evaluation with state None

It detects that the container runs as privileged, and declares the
system as unhealthy.

It is unclear to me why coresys.core.healthy was checked in this
context, it doesn't seem useful. Just remove the check, and validate
the state through the getter instead.

* Update supervisor/core.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Make sure Supervisor container is privileged in pytest

With the Supervisor Core state being valid now, some evaluations
now actually run when loading the resolution center. This leads to
Supervisor getting declared unhealthy due to not running in a privileged
container under pytest.

Fake the host container to be privileged to make evaluations not
causing the system to be declared unhealthy under pytest.

* Avoid writing actual Supervisor run state file

With the Supervisor Core state being valid from the very start, we end
up writing a state everytime.

Instead of actually writing a state file, simply validate the the
necessary calls are being made. This is more conform to typical unit
tests and avoids writing a file for every test.

* Extend WebSocket client fixture and use it consistently

Extend the ha_ws_client WebSocket client fixture to set Supervisor Core
into run state and clear all pending messages.

Currently only some tests use the ha_ws_client WebSocket client fixture.
Use it consistently for all tests.

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
2025-02-28 18:01:55 +01:00

220 lines
7.6 KiB
Python

"""Test API security layer."""
import asyncio
from http import HTTPStatus
from unittest.mock import patch
from aiohttp import web
from aiohttp.test_utils import TestClient
import pytest
import urllib3
from supervisor.addons.addon import Addon
from supervisor.api import RestAPI
from supervisor.const import ROLE_ALL, CoreState
from supervisor.coresys import CoreSys
# pylint: disable=redefined-outer-name
async def mock_handler(request):
"""Return OK."""
return web.Response(text="OK")
@pytest.fixture
async def api_system(aiohttp_client, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client."""
api = RestAPI(coresys)
api.webapp = web.Application()
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.load()
api.webapp.middlewares.append(api.security.block_bad_requests)
api.webapp.middlewares.append(api.security.system_validation)
api.webapp.router.add_get("/{all:.*}", mock_handler)
yield await aiohttp_client(api.webapp)
@pytest.fixture
async def api_token_validation(aiohttp_client, coresys: CoreSys) -> TestClient:
"""Fixture for RestAPI client with token validation middleware."""
api = RestAPI(coresys)
api.webapp = web.Application()
with patch("supervisor.docker.supervisor.os") as os:
os.environ = {"SUPERVISOR_NAME": "hassio_supervisor"}
await api.start()
api.webapp.middlewares.append(api.security.token_validation)
api.webapp.router.add_get("/{all:.*}", mock_handler)
api.webapp.router.add_post("/{all:.*}", mock_handler)
api.webapp.router.add_delete("/{all:.*}", mock_handler)
yield await aiohttp_client(api.webapp)
@pytest.mark.asyncio
async def test_api_security_system_initialize(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.INITIALIZE
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
@pytest.mark.asyncio
async def test_api_security_system_setup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.SETUP
resp = await api_system.get("/supervisor/ping")
result = await resp.json()
assert resp.status == 400
assert result["result"] == "error"
@pytest.mark.asyncio
async def test_api_security_system_running(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.RUNNING
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@pytest.mark.asyncio
async def test_api_security_system_startup(api_system: TestClient, coresys: CoreSys):
"""Test security."""
coresys.core.state = CoreState.STARTUP
resp = await api_system.get("/supervisor/ping")
assert resp.status == 200
@pytest.mark.asyncio
@pytest.mark.parametrize(
("request_path", "request_params", "fail_on_query_string"),
[
("/proc/self/environ", {}, False),
("/", {"test": "/test/../../api"}, True),
("/", {"test": "test/../../api"}, True),
("/", {"test": "/test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/", {"test": "test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/", {"test": "test/%252E%252E/api"}, True),
("/", {"test": "test/%252E%252E%2fapi"}, True),
(
"/",
{"test": "test/%2525252E%2525252E%2525252f%2525252E%2525252E%2525252fapi"},
True,
),
("/test/.%252E/api", {}, False),
("/test/%252E%252E/api", {}, False),
("/test/%2E%2E%2f%2E%2E%2fapi", {}, False),
("/test/%2525252E%2525252E%2525252f%2525252E%2525252E/api", {}, False),
("/", {"sql": ";UNION SELECT (a, b"}, True),
("/", {"sql": "UNION%20SELECT%20%28a%2C%20b"}, True),
("/UNION%20SELECT%20%28a%2C%20b", {}, False),
("/", {"sql": "concat(..."}, True),
("/", {"xss": "<script >"}, True),
("/<script >", {"xss": ""}, False),
("/%3Cscript%3E", {}, False),
],
)
async def test_bad_requests(
request_path: str,
request_params: dict[str, str],
fail_on_query_string: bool,
api_system: TestClient,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test request paths that should be filtered."""
# Manual params handling
if request_params:
raw_params = "&".join(f"{val}={key}" for val, key in request_params.items())
man_params = f"?{raw_params}"
else:
man_params = ""
http = urllib3.PoolManager()
resp = await asyncio.get_running_loop().run_in_executor(
None,
http.request,
"GET",
f"http://{api_system.host}:{api_system.port}{request_path}{man_params}",
request_params,
)
assert resp.status == HTTPStatus.BAD_REQUEST
message = "Filtered a potential harmful request to:"
if fail_on_query_string:
message = "Filtered a request with a potential harmful query string:"
assert message in caplog.text
@pytest.mark.parametrize(
"request_method,request_path,success_roles",
[
("post", "/auth/reset", {"admin"}),
("get", "/auth/list", {"admin"}),
("delete", "/auth/cache", {"admin", "manager"}),
("get", "/auth", set(ROLE_ALL)),
("post", "/auth", set(ROLE_ALL)),
("get", "/backups/info", set(ROLE_ALL)),
("get", "/backups/abc123/download", {"admin", "manager", "backup"}),
("post", "/backups/new/full", {"admin", "manager", "backup"}),
("post", "/backups/abc123/restore/full", {"admin", "manager", "backup"}),
("get", "/core/info", set(ROLE_ALL)),
("post", "/core/update", {"admin", "manager", "homeassistant"}),
("post", "/core/restart", {"admin", "manager", "homeassistant"}),
("get", "/addons/self/options/config", set(ROLE_ALL)),
("post", "/addons/self/options", set(ROLE_ALL)),
("post", "/addons/self/restart", set(ROLE_ALL)),
("post", "/addons/self/security", {"admin"}),
("get", "/addons/abc123/options/config", {"admin", "manager"}),
("post", "/addons/abc123/options", {"admin", "manager"}),
("post", "/addons/abc123/restart", {"admin", "manager"}),
("post", "/addons/abc123/security", {"admin"}),
("post", "/os/datadisk/wipe", {"admin"}),
("post", "/addons/self/sys_options", set()),
("post", "/addons/abc123/sys_options", set()),
],
)
async def test_token_validation(
api_token_validation: TestClient,
install_addon_example: Addon,
request_method: str,
request_path: str,
success_roles: set[str],
):
"""Test token validation paths."""
install_addon_example.persist["access_token"] = "abc123"
install_addon_example.data["hassio_api"] = True
for role in success_roles:
install_addon_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200
for role in set(ROLE_ALL) - success_roles:
install_addon_example.data["hassio_role"] = role
resp = await getattr(api_token_validation, request_method)(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 403
async def test_home_assistant_paths(api_token_validation: TestClient, coresys: CoreSys):
"""Test Home Assistant only paths."""
coresys.homeassistant.supervisor_token = "abc123"
resp = await api_token_validation.post(
"/addons/local_test/sys_options", headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200