Files
supervisor/tests/api/test_supervisor.py
2025-10-02 17:48:38 +00:00

243 lines
8.1 KiB
Python

"""Test Supervisor API."""
# pylint: disable=protected-access
import time
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp.test_utils import TestClient
from blockbuster import BlockingError
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import HassioError, HostNotSupportedError
from tests.api import common_test_api_advanced_logs
from tests.dbus_service_mocks.base import DBusServiceMock
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
REPO_URL = "https://github.com/awesome-developer/awesome-repo"
async def test_api_supervisor_options_debug(api_client: TestClient, coresys: CoreSys):
"""Test security options force security."""
assert not coresys.config.debug
await api_client.post("/supervisor/options", json={"debug": True})
assert coresys.config.debug
async def test_api_supervisor_options_auto_update(
api_client: TestClient, coresys: CoreSys
):
"""Test disabling auto update via api."""
assert coresys.updater.auto_update is True
response = await api_client.post("/supervisor/options", json={"auto_update": False})
assert response.status == 200
assert coresys.updater.auto_update is False
async def test_api_supervisor_options_diagnostics(
api_client: TestClient,
coresys: CoreSys,
os_agent_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
):
"""Test changing diagnostics."""
os_agent_service: OSAgentService = os_agent_services["os_agent"]
os_agent_service.Diagnostics = False
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is False
with patch("supervisor.utils.sentry.sentry_sdk.init") as sentry_init:
response = await api_client.post(
"/supervisor/options", json={"diagnostics": True}
)
assert response.status == 200
sentry_init.assert_called_once()
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is True
with patch("supervisor.api.supervisor.close_sentry") as close_sentry:
response = await api_client.post(
"/supervisor/options", json={"diagnostics": False}
)
assert response.status == 200
close_sentry.assert_called_once()
await os_agent_service.ping()
assert coresys.dbus.agent.diagnostics is False
async def test_api_supervisor_logs(
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
):
"""Test supervisor logs."""
await common_test_api_advanced_logs(
"/supervisor",
"hassio_supervisor",
api_client,
journald_logs,
coresys,
os_available,
)
async def test_api_supervisor_fallback(
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
):
"""Check that supervisor logs read from container logs if reading from journald gateway fails badly."""
journald_logs.side_effect = HassioError("Something bad happened!")
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get("/supervisor/logs")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
content = await resp.read()
assert content.split(b"\n")[0:2] == [
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os\x1b[0m",
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os/AppArmor\x1b[0m",
]
# check fallback also works for the /follow endpoint (no mock reset needed)
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get("/supervisor/logs/follow")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
# check the /latest endpoint as well
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get("/supervisor/logs/latest")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
# also check generic Python error
journald_logs.side_effect = OSError("Something bad happened!")
with patch("supervisor.api._LOGGER.exception") as logger:
resp = await api_client.get("/supervisor/logs")
logger.assert_called_once_with(
"Failed to get supervisor logs using advanced_logs API"
)
assert resp.status == 200
assert resp.content_type == "text/plain"
async def test_api_supervisor_fallback_log_capture(
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
):
"""Check that Sentry log capture is executed only for unexpected errors."""
journald_logs.side_effect = HostNotSupportedError(
"No systemd-journal-gatewayd Unix socket available!"
)
with patch("supervisor.api.async_capture_exception") as capture_exception:
await api_client.get("/supervisor/logs")
capture_exception.assert_not_called()
journald_logs.reset_mock()
journald_logs.side_effect = HassioError("Something bad happened!")
with patch("supervisor.api.async_capture_exception") as capture_exception:
await api_client.get("/supervisor/logs")
capture_exception.assert_called_once()
async def test_api_supervisor_reload(
api_client: TestClient, supervisor_internet: AsyncMock, websession: MagicMock
):
"""Test supervisor reload."""
resp = await api_client.post("/supervisor/reload")
assert resp.status == 200
assert await resp.json() == {"result": "ok", "data": {}}
async def test_api_supervisor_options_timezone(
api_client: TestClient, coresys: CoreSys
):
"""Test setting supervisor timezone via API."""
assert coresys.timezone == "Etc/UTC"
resp = await api_client.post(
"/supervisor/options", json={"timezone": "Europe/Zurich"}
)
assert resp.status == 200
assert coresys.timezone == "Europe/Zurich"
async def test_api_supervisor_options_country(api_client: TestClient, coresys: CoreSys):
"""Test setting supervisor country via API."""
assert coresys.config.country is None
resp = await api_client.post("/supervisor/options", json={"country": "CH"})
assert resp.status == 200
assert coresys.config.country == "CH"
resp = await api_client.get("/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["country"] == "CH"
@pytest.mark.parametrize(
("blockbuster", "option_value", "config_value"),
[("no_blockbuster", "on", False), ("no_blockbuster", "on-at-startup", True)],
indirect=["blockbuster"],
)
async def test_api_supervisor_options_blocking_io(
api_client: TestClient, coresys: CoreSys, option_value: str, config_value: bool
):
"""Test setting supervisor detect blocking io option."""
# This should not fail with a blocking error yet
time.sleep(0)
resp = await api_client.post(
"/supervisor/options", json={"detect_blocking_io": option_value}
)
assert resp.status == 200
resp = await api_client.get("/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["detect_blocking_io"] is True
# This remains false because we only turned it on for current run of supervisor, not permanently
assert coresys.config.detect_blocking_io is config_value
with pytest.raises(BlockingError):
time.sleep(0)
resp = await api_client.post(
"/supervisor/options", json={"detect_blocking_io": "off"}
)
assert resp.status == 200
resp = await api_client.get("/supervisor/info")
assert resp.status == 200
body = await resp.json()
assert body["data"]["detect_blocking_io"] is False
assert coresys.config.detect_blocking_io is False
# This should not raise blocking error anymore
time.sleep(0)