mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-11-27 11:38:07 +00:00
* Strip ANSI escape color sequences from /latest log responses Strip ANSI sequences of CSI commands [1] used for log coloring from /latest log endpoints. These endpoint were primarily designed for log downloads and colors are mostly not wanted in those. Add optional argument for stripping the colors from the logs and enable it for the /latest endpoints. [1] https://en.wikipedia.org/wiki/ANSI_escape_code#CSIsection * Refactor advanced logs' tests to use fixture factory Introduce `advanced_logs_tester` fixture to simplify testing of advanced logs in the API tests, declaring all the needed fixture in a single place. # Please enter the commit message for your changes. Lines starting
410 lines
14 KiB
Python
410 lines
14 KiB
Python
"""Test Supervisor API."""
|
|
|
|
# pylint: disable=protected-access
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
|
|
|
from aiohttp.test_utils import TestClient
|
|
from awesomeversion import AwesomeVersion
|
|
from blockbuster import BlockingError
|
|
import pytest
|
|
|
|
from supervisor.const import CoreState
|
|
from supervisor.core import Core
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.exceptions import HassioError, HostNotSupportedError, StoreGitError
|
|
from supervisor.homeassistant.const import WSEvent
|
|
from supervisor.store.repository import Repository
|
|
from supervisor.supervisor import Supervisor
|
|
from supervisor.updater import Updater
|
|
|
|
from tests.common import AsyncIterator, load_json_fixture
|
|
from tests.dbus_service_mocks.base import DBusServiceMock
|
|
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
|
|
|
|
REPO_URL = "https://github.com/awesome-developer/awesome-repo"
|
|
|
|
|
|
async def test_api_supervisor_options_debug(api_client: TestClient, coresys: CoreSys):
|
|
"""Test security options force security."""
|
|
assert not coresys.config.debug
|
|
|
|
await api_client.post("/supervisor/options", json={"debug": True})
|
|
|
|
assert coresys.config.debug
|
|
|
|
|
|
async def test_api_supervisor_options_add_repository(
|
|
api_client: TestClient, coresys: CoreSys, supervisor_internet: AsyncMock
|
|
):
|
|
"""Test add a repository via POST /supervisor/options REST API."""
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
with (
|
|
patch("supervisor.store.repository.RepositoryGit.load", return_value=None),
|
|
patch("supervisor.store.repository.RepositoryGit.validate", return_value=True),
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": [REPO_URL]}
|
|
)
|
|
|
|
assert response.status == 200
|
|
assert REPO_URL in coresys.store.repository_urls
|
|
|
|
|
|
async def test_api_supervisor_options_remove_repository(
|
|
api_client: TestClient, coresys: CoreSys, test_repository: Repository
|
|
):
|
|
"""Test remove a repository via POST /supervisor/options REST API."""
|
|
assert test_repository.source in coresys.store.repository_urls
|
|
assert test_repository.slug in coresys.store.repositories
|
|
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": []}
|
|
)
|
|
|
|
assert response.status == 200
|
|
assert test_repository.source not in coresys.store.repository_urls
|
|
assert test_repository.slug not in coresys.store.repositories
|
|
|
|
|
|
@pytest.mark.parametrize("git_error", [None, StoreGitError()])
|
|
async def test_api_supervisor_options_repositories_skipped_on_error(
|
|
api_client: TestClient, coresys: CoreSys, git_error: StoreGitError
|
|
):
|
|
"""Test repositories skipped on error via POST /supervisor/options REST API."""
|
|
with (
|
|
patch("supervisor.store.repository.RepositoryGit.load", side_effect=git_error),
|
|
patch("supervisor.store.repository.RepositoryGit.validate", return_value=False),
|
|
patch("supervisor.store.repository.RepositoryCustom.remove"),
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": [REPO_URL]}
|
|
)
|
|
|
|
assert response.status == 400
|
|
assert len(coresys.resolution.suggestions) == 0
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
|
|
async def test_api_supervisor_options_repo_error_with_config_change(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test config change with add repository error via POST /supervisor/options REST API."""
|
|
assert not coresys.config.debug
|
|
|
|
with patch(
|
|
"supervisor.store.repository.RepositoryGit.load", side_effect=StoreGitError()
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options",
|
|
json={"debug": True, "addons_repositories": [REPO_URL]},
|
|
)
|
|
|
|
assert response.status == 400
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
assert coresys.config.debug
|
|
coresys.updater.save_data.assert_called_once()
|
|
coresys.config.save_data.assert_called_once()
|
|
|
|
|
|
async def test_api_supervisor_options_auto_update(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test disabling auto update via api."""
|
|
assert coresys.updater.auto_update is True
|
|
|
|
response = await api_client.post("/supervisor/options", json={"auto_update": False})
|
|
|
|
assert response.status == 200
|
|
|
|
assert coresys.updater.auto_update is False
|
|
|
|
|
|
async def test_api_supervisor_options_diagnostics(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
os_agent_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
):
|
|
"""Test changing diagnostics."""
|
|
os_agent_service: OSAgentService = os_agent_services["os_agent"]
|
|
os_agent_service.Diagnostics = False
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is False
|
|
|
|
with patch("supervisor.utils.sentry.sentry_sdk.init") as sentry_init:
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"diagnostics": True}
|
|
)
|
|
assert response.status == 200
|
|
sentry_init.assert_called_once()
|
|
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is True
|
|
|
|
with patch("supervisor.api.supervisor.close_sentry") as close_sentry:
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"diagnostics": False}
|
|
)
|
|
assert response.status == 200
|
|
close_sentry.assert_called_once()
|
|
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is False
|
|
|
|
|
|
async def test_api_supervisor_logs(advanced_logs_tester):
|
|
"""Test supervisor logs."""
|
|
await advanced_logs_tester("/supervisor", "hassio_supervisor")
|
|
|
|
|
|
async def test_api_supervisor_fallback(
|
|
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
|
|
):
|
|
"""Check that supervisor logs read from container logs if reading from journald gateway fails badly."""
|
|
journald_logs.side_effect = HassioError("Something bad happened!")
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
content = await resp.read()
|
|
assert content.split(b"\n")[0:2] == [
|
|
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os\x1b[0m",
|
|
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os/AppArmor\x1b[0m",
|
|
]
|
|
|
|
# check fallback also works for the /follow endpoint (no mock reset needed)
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs/follow")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
# check the /latest endpoint as well
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs/latest")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
# also check generic Python error
|
|
journald_logs.side_effect = OSError("Something bad happened!")
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
|
|
async def test_api_supervisor_fallback_log_capture(
|
|
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
|
|
):
|
|
"""Check that Sentry log capture is executed only for unexpected errors."""
|
|
journald_logs.side_effect = HostNotSupportedError(
|
|
"No systemd-journal-gatewayd Unix socket available!"
|
|
)
|
|
|
|
with patch("supervisor.api.async_capture_exception") as capture_exception:
|
|
await api_client.get("/supervisor/logs")
|
|
capture_exception.assert_not_called()
|
|
|
|
journald_logs.reset_mock()
|
|
|
|
journald_logs.side_effect = HassioError("Something bad happened!")
|
|
|
|
with patch("supervisor.api.async_capture_exception") as capture_exception:
|
|
await api_client.get("/supervisor/logs")
|
|
capture_exception.assert_called_once()
|
|
|
|
|
|
async def test_api_supervisor_reload(
|
|
api_client: TestClient, supervisor_internet: AsyncMock, websession: MagicMock
|
|
):
|
|
"""Test supervisor reload."""
|
|
resp = await api_client.post("/supervisor/reload")
|
|
assert resp.status == 200
|
|
assert await resp.json() == {"result": "ok", "data": {}}
|
|
|
|
|
|
async def test_api_supervisor_options_timezone(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test setting supervisor timezone via API."""
|
|
assert coresys.timezone == "Etc/UTC"
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"timezone": "Europe/Zurich"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
assert coresys.timezone == "Europe/Zurich"
|
|
|
|
|
|
async def test_api_supervisor_options_country(api_client: TestClient, coresys: CoreSys):
|
|
"""Test setting supervisor country via API."""
|
|
assert coresys.config.country is None
|
|
|
|
resp = await api_client.post("/supervisor/options", json={"country": "CH"})
|
|
assert resp.status == 200
|
|
|
|
assert coresys.config.country == "CH"
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["country"] == "CH"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("blockbuster", "option_value", "config_value"),
|
|
[("no_blockbuster", "on", False), ("no_blockbuster", "on-at-startup", True)],
|
|
indirect=["blockbuster"],
|
|
)
|
|
async def test_api_supervisor_options_blocking_io(
|
|
api_client: TestClient, coresys: CoreSys, option_value: str, config_value: bool
|
|
):
|
|
"""Test setting supervisor detect blocking io option."""
|
|
# This should not fail with a blocking error yet
|
|
time.sleep(0)
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"detect_blocking_io": option_value}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["detect_blocking_io"] is True
|
|
|
|
# This remains false because we only turned it on for current run of supervisor, not permanently
|
|
assert coresys.config.detect_blocking_io is config_value
|
|
|
|
with pytest.raises(BlockingError):
|
|
time.sleep(0)
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"detect_blocking_io": "off"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["detect_blocking_io"] is False
|
|
assert coresys.config.detect_blocking_io is False
|
|
|
|
# This should not raise blocking error anymore
|
|
time.sleep(0)
|
|
|
|
|
|
@pytest.mark.usefixtures("tmp_supervisor_data")
|
|
async def test_api_progress_updates_supervisor_update(
|
|
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
|
|
):
|
|
"""Test progress updates sent to Home Assistant for updates."""
|
|
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
|
coresys.core.set_state(CoreState.RUNNING)
|
|
|
|
logs = load_json_fixture("docker_pull_image_log.json")
|
|
coresys.docker.images.pull.return_value = AsyncIterator(logs)
|
|
|
|
with (
|
|
patch.object(
|
|
Supervisor,
|
|
"version",
|
|
new=PropertyMock(return_value=AwesomeVersion("2025.08.0")),
|
|
),
|
|
patch.object(
|
|
Updater,
|
|
"version_supervisor",
|
|
new=PropertyMock(return_value=AwesomeVersion("2025.08.3")),
|
|
),
|
|
patch.object(
|
|
Updater, "image_supervisor", new=PropertyMock(return_value="supervisor")
|
|
),
|
|
patch.object(Supervisor, "update_apparmor"),
|
|
patch.object(Core, "stop"),
|
|
):
|
|
resp = await api_client.post("/supervisor/update")
|
|
|
|
assert resp.status == 200
|
|
|
|
events = [
|
|
{
|
|
"stage": evt.args[0]["data"]["data"]["stage"],
|
|
"progress": evt.args[0]["data"]["data"]["progress"],
|
|
"done": evt.args[0]["data"]["data"]["done"],
|
|
}
|
|
for evt in ha_ws_client.async_send_command.call_args_list
|
|
if "data" in evt.args[0]
|
|
and evt.args[0]["data"]["event"] == WSEvent.JOB
|
|
and evt.args[0]["data"]["data"]["name"] == "supervisor_update"
|
|
]
|
|
assert events[:4] == [
|
|
{
|
|
"stage": None,
|
|
"progress": 0,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 0.1,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 1.2,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 2.8,
|
|
"done": False,
|
|
},
|
|
]
|
|
assert events[-5:] == [
|
|
{
|
|
"stage": None,
|
|
"progress": 97.2,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 98.4,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 99.4,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 100,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 100,
|
|
"done": True,
|
|
},
|
|
]
|