mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-10-18 08:09:36 +00:00

* Add progress reporting to addon, HA and Supervisor updates * Fix assert in test * Add progress to addon, core, supervisor updates/installs * Fix double install bug in addons install * Remove initial_install and re-arrange order of load
420 lines
14 KiB
Python
420 lines
14 KiB
Python
"""Test Supervisor API."""
|
|
|
|
# pylint: disable=protected-access
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
|
|
|
from aiohttp.test_utils import TestClient
|
|
from awesomeversion import AwesomeVersion
|
|
from blockbuster import BlockingError
|
|
import pytest
|
|
|
|
from supervisor.const import CoreState
|
|
from supervisor.core import Core
|
|
from supervisor.coresys import CoreSys
|
|
from supervisor.exceptions import HassioError, HostNotSupportedError, StoreGitError
|
|
from supervisor.homeassistant.const import WSEvent
|
|
from supervisor.store.repository import Repository
|
|
from supervisor.supervisor import Supervisor
|
|
from supervisor.updater import Updater
|
|
|
|
from tests.api import common_test_api_advanced_logs
|
|
from tests.common import load_json_fixture
|
|
from tests.dbus_service_mocks.base import DBusServiceMock
|
|
from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService
|
|
|
|
REPO_URL = "https://github.com/awesome-developer/awesome-repo"
|
|
|
|
|
|
async def test_api_supervisor_options_debug(api_client: TestClient, coresys: CoreSys):
|
|
"""Test security options force security."""
|
|
assert not coresys.config.debug
|
|
|
|
await api_client.post("/supervisor/options", json={"debug": True})
|
|
|
|
assert coresys.config.debug
|
|
|
|
|
|
async def test_api_supervisor_options_add_repository(
|
|
api_client: TestClient, coresys: CoreSys, supervisor_internet: AsyncMock
|
|
):
|
|
"""Test add a repository via POST /supervisor/options REST API."""
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
with (
|
|
patch("supervisor.store.repository.RepositoryGit.load", return_value=None),
|
|
patch("supervisor.store.repository.RepositoryGit.validate", return_value=True),
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": [REPO_URL]}
|
|
)
|
|
|
|
assert response.status == 200
|
|
assert REPO_URL in coresys.store.repository_urls
|
|
|
|
|
|
async def test_api_supervisor_options_remove_repository(
|
|
api_client: TestClient, coresys: CoreSys, test_repository: Repository
|
|
):
|
|
"""Test remove a repository via POST /supervisor/options REST API."""
|
|
assert test_repository.source in coresys.store.repository_urls
|
|
assert test_repository.slug in coresys.store.repositories
|
|
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": []}
|
|
)
|
|
|
|
assert response.status == 200
|
|
assert test_repository.source not in coresys.store.repository_urls
|
|
assert test_repository.slug not in coresys.store.repositories
|
|
|
|
|
|
@pytest.mark.parametrize("git_error", [None, StoreGitError()])
|
|
async def test_api_supervisor_options_repositories_skipped_on_error(
|
|
api_client: TestClient, coresys: CoreSys, git_error: StoreGitError
|
|
):
|
|
"""Test repositories skipped on error via POST /supervisor/options REST API."""
|
|
with (
|
|
patch("supervisor.store.repository.RepositoryGit.load", side_effect=git_error),
|
|
patch("supervisor.store.repository.RepositoryGit.validate", return_value=False),
|
|
patch("supervisor.store.repository.RepositoryCustom.remove"),
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"addons_repositories": [REPO_URL]}
|
|
)
|
|
|
|
assert response.status == 400
|
|
assert len(coresys.resolution.suggestions) == 0
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
|
|
async def test_api_supervisor_options_repo_error_with_config_change(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test config change with add repository error via POST /supervisor/options REST API."""
|
|
assert not coresys.config.debug
|
|
|
|
with patch(
|
|
"supervisor.store.repository.RepositoryGit.load", side_effect=StoreGitError()
|
|
):
|
|
response = await api_client.post(
|
|
"/supervisor/options",
|
|
json={"debug": True, "addons_repositories": [REPO_URL]},
|
|
)
|
|
|
|
assert response.status == 400
|
|
assert REPO_URL not in coresys.store.repository_urls
|
|
|
|
assert coresys.config.debug
|
|
coresys.updater.save_data.assert_called_once()
|
|
coresys.config.save_data.assert_called_once()
|
|
|
|
|
|
async def test_api_supervisor_options_auto_update(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test disabling auto update via api."""
|
|
assert coresys.updater.auto_update is True
|
|
|
|
response = await api_client.post("/supervisor/options", json={"auto_update": False})
|
|
|
|
assert response.status == 200
|
|
|
|
assert coresys.updater.auto_update is False
|
|
|
|
|
|
async def test_api_supervisor_options_diagnostics(
|
|
api_client: TestClient,
|
|
coresys: CoreSys,
|
|
os_agent_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]],
|
|
):
|
|
"""Test changing diagnostics."""
|
|
os_agent_service: OSAgentService = os_agent_services["os_agent"]
|
|
os_agent_service.Diagnostics = False
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is False
|
|
|
|
with patch("supervisor.utils.sentry.sentry_sdk.init") as sentry_init:
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"diagnostics": True}
|
|
)
|
|
assert response.status == 200
|
|
sentry_init.assert_called_once()
|
|
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is True
|
|
|
|
with patch("supervisor.api.supervisor.close_sentry") as close_sentry:
|
|
response = await api_client.post(
|
|
"/supervisor/options", json={"diagnostics": False}
|
|
)
|
|
assert response.status == 200
|
|
close_sentry.assert_called_once()
|
|
|
|
await os_agent_service.ping()
|
|
assert coresys.dbus.agent.diagnostics is False
|
|
|
|
|
|
async def test_api_supervisor_logs(
|
|
api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available
|
|
):
|
|
"""Test supervisor logs."""
|
|
await common_test_api_advanced_logs(
|
|
"/supervisor",
|
|
"hassio_supervisor",
|
|
api_client,
|
|
journald_logs,
|
|
coresys,
|
|
os_available,
|
|
)
|
|
|
|
|
|
async def test_api_supervisor_fallback(
|
|
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
|
|
):
|
|
"""Check that supervisor logs read from container logs if reading from journald gateway fails badly."""
|
|
journald_logs.side_effect = HassioError("Something bad happened!")
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
content = await resp.read()
|
|
assert content.split(b"\n")[0:2] == [
|
|
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os\x1b[0m",
|
|
b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os/AppArmor\x1b[0m",
|
|
]
|
|
|
|
# check fallback also works for the /follow endpoint (no mock reset needed)
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs/follow")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
# check the /latest endpoint as well
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs/latest")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
# also check generic Python error
|
|
journald_logs.side_effect = OSError("Something bad happened!")
|
|
|
|
with patch("supervisor.api._LOGGER.exception") as logger:
|
|
resp = await api_client.get("/supervisor/logs")
|
|
logger.assert_called_once_with(
|
|
"Failed to get supervisor logs using advanced_logs API"
|
|
)
|
|
assert resp.status == 200
|
|
assert resp.content_type == "text/plain"
|
|
|
|
|
|
async def test_api_supervisor_fallback_log_capture(
|
|
api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock
|
|
):
|
|
"""Check that Sentry log capture is executed only for unexpected errors."""
|
|
journald_logs.side_effect = HostNotSupportedError(
|
|
"No systemd-journal-gatewayd Unix socket available!"
|
|
)
|
|
|
|
with patch("supervisor.api.async_capture_exception") as capture_exception:
|
|
await api_client.get("/supervisor/logs")
|
|
capture_exception.assert_not_called()
|
|
|
|
journald_logs.reset_mock()
|
|
|
|
journald_logs.side_effect = HassioError("Something bad happened!")
|
|
|
|
with patch("supervisor.api.async_capture_exception") as capture_exception:
|
|
await api_client.get("/supervisor/logs")
|
|
capture_exception.assert_called_once()
|
|
|
|
|
|
async def test_api_supervisor_reload(
|
|
api_client: TestClient, supervisor_internet: AsyncMock, websession: MagicMock
|
|
):
|
|
"""Test supervisor reload."""
|
|
resp = await api_client.post("/supervisor/reload")
|
|
assert resp.status == 200
|
|
assert await resp.json() == {"result": "ok", "data": {}}
|
|
|
|
|
|
async def test_api_supervisor_options_timezone(
|
|
api_client: TestClient, coresys: CoreSys
|
|
):
|
|
"""Test setting supervisor timezone via API."""
|
|
assert coresys.timezone == "Etc/UTC"
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"timezone": "Europe/Zurich"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
assert coresys.timezone == "Europe/Zurich"
|
|
|
|
|
|
async def test_api_supervisor_options_country(api_client: TestClient, coresys: CoreSys):
|
|
"""Test setting supervisor country via API."""
|
|
assert coresys.config.country is None
|
|
|
|
resp = await api_client.post("/supervisor/options", json={"country": "CH"})
|
|
assert resp.status == 200
|
|
|
|
assert coresys.config.country == "CH"
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["country"] == "CH"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("blockbuster", "option_value", "config_value"),
|
|
[("no_blockbuster", "on", False), ("no_blockbuster", "on-at-startup", True)],
|
|
indirect=["blockbuster"],
|
|
)
|
|
async def test_api_supervisor_options_blocking_io(
|
|
api_client: TestClient, coresys: CoreSys, option_value: str, config_value: bool
|
|
):
|
|
"""Test setting supervisor detect blocking io option."""
|
|
# This should not fail with a blocking error yet
|
|
time.sleep(0)
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"detect_blocking_io": option_value}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["detect_blocking_io"] is True
|
|
|
|
# This remains false because we only turned it on for current run of supervisor, not permanently
|
|
assert coresys.config.detect_blocking_io is config_value
|
|
|
|
with pytest.raises(BlockingError):
|
|
time.sleep(0)
|
|
|
|
resp = await api_client.post(
|
|
"/supervisor/options", json={"detect_blocking_io": "off"}
|
|
)
|
|
assert resp.status == 200
|
|
|
|
resp = await api_client.get("/supervisor/info")
|
|
assert resp.status == 200
|
|
body = await resp.json()
|
|
assert body["data"]["detect_blocking_io"] is False
|
|
assert coresys.config.detect_blocking_io is False
|
|
|
|
# This should not raise blocking error anymore
|
|
time.sleep(0)
|
|
|
|
|
|
@pytest.mark.usefixtures("tmp_supervisor_data")
|
|
async def test_api_progress_updates_supervisor_update(
|
|
api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock
|
|
):
|
|
"""Test progress updates sent to Home Assistant for updates."""
|
|
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
|
coresys.core.set_state(CoreState.RUNNING)
|
|
coresys.docker.docker.api.pull.return_value = load_json_fixture(
|
|
"docker_pull_image_log.json"
|
|
)
|
|
|
|
with (
|
|
patch.object(
|
|
Supervisor,
|
|
"version",
|
|
new=PropertyMock(return_value=AwesomeVersion("2025.08.0")),
|
|
),
|
|
patch.object(
|
|
Updater,
|
|
"version_supervisor",
|
|
new=PropertyMock(return_value=AwesomeVersion("2025.08.3")),
|
|
),
|
|
patch.object(
|
|
Updater, "image_supervisor", new=PropertyMock(return_value="supervisor")
|
|
),
|
|
patch.object(Supervisor, "update_apparmor"),
|
|
patch.object(Core, "stop"),
|
|
):
|
|
resp = await api_client.post("/supervisor/update")
|
|
|
|
assert resp.status == 200
|
|
|
|
events = [
|
|
{
|
|
"stage": evt.args[0]["data"]["data"]["stage"],
|
|
"progress": evt.args[0]["data"]["data"]["progress"],
|
|
"done": evt.args[0]["data"]["data"]["done"],
|
|
}
|
|
for evt in ha_ws_client.async_send_command.call_args_list
|
|
if "data" in evt.args[0]
|
|
and evt.args[0]["data"]["event"] == WSEvent.JOB
|
|
and evt.args[0]["data"]["data"]["name"] == "supervisor_update"
|
|
]
|
|
assert events[:4] == [
|
|
{
|
|
"stage": None,
|
|
"progress": 0,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 0.1,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 1.2,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 2.8,
|
|
"done": False,
|
|
},
|
|
]
|
|
assert events[-5:] == [
|
|
{
|
|
"stage": None,
|
|
"progress": 97.2,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 98.4,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 99.4,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 100,
|
|
"done": False,
|
|
},
|
|
{
|
|
"stage": None,
|
|
"progress": 100,
|
|
"done": True,
|
|
},
|
|
]
|