"""Test Supervisor API.""" # pylint: disable=protected-access import time from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch from aiohttp.test_utils import TestClient from awesomeversion import AwesomeVersion from blockbuster import BlockingError import pytest from supervisor.const import CoreState from supervisor.core import Core from supervisor.coresys import CoreSys from supervisor.exceptions import HassioError, HostNotSupportedError, StoreGitError from supervisor.homeassistant.const import WSEvent from supervisor.store.repository import Repository from supervisor.supervisor import Supervisor from supervisor.updater import Updater from tests.api import common_test_api_advanced_logs from tests.common import load_json_fixture from tests.dbus_service_mocks.base import DBusServiceMock from tests.dbus_service_mocks.os_agent import OSAgent as OSAgentService REPO_URL = "https://github.com/awesome-developer/awesome-repo" async def test_api_supervisor_options_debug(api_client: TestClient, coresys: CoreSys): """Test security options force security.""" assert not coresys.config.debug await api_client.post("/supervisor/options", json={"debug": True}) assert coresys.config.debug async def test_api_supervisor_options_add_repository( api_client: TestClient, coresys: CoreSys, supervisor_internet: AsyncMock ): """Test add a repository via POST /supervisor/options REST API.""" assert REPO_URL not in coresys.store.repository_urls with ( patch("supervisor.store.repository.RepositoryGit.load", return_value=None), patch("supervisor.store.repository.RepositoryGit.validate", return_value=True), ): response = await api_client.post( "/supervisor/options", json={"addons_repositories": [REPO_URL]} ) assert response.status == 200 assert REPO_URL in coresys.store.repository_urls async def test_api_supervisor_options_remove_repository( api_client: TestClient, coresys: CoreSys, test_repository: Repository ): """Test remove a repository via POST /supervisor/options REST API.""" assert test_repository.source in coresys.store.repository_urls assert test_repository.slug in coresys.store.repositories response = await api_client.post( "/supervisor/options", json={"addons_repositories": []} ) assert response.status == 200 assert test_repository.source not in coresys.store.repository_urls assert test_repository.slug not in coresys.store.repositories @pytest.mark.parametrize("git_error", [None, StoreGitError()]) async def test_api_supervisor_options_repositories_skipped_on_error( api_client: TestClient, coresys: CoreSys, git_error: StoreGitError ): """Test repositories skipped on error via POST /supervisor/options REST API.""" with ( patch("supervisor.store.repository.RepositoryGit.load", side_effect=git_error), patch("supervisor.store.repository.RepositoryGit.validate", return_value=False), patch("supervisor.store.repository.RepositoryCustom.remove"), ): response = await api_client.post( "/supervisor/options", json={"addons_repositories": [REPO_URL]} ) assert response.status == 400 assert len(coresys.resolution.suggestions) == 0 assert REPO_URL not in coresys.store.repository_urls async def test_api_supervisor_options_repo_error_with_config_change( api_client: TestClient, coresys: CoreSys ): """Test config change with add repository error via POST /supervisor/options REST API.""" assert not coresys.config.debug with patch( "supervisor.store.repository.RepositoryGit.load", side_effect=StoreGitError() ): response = await api_client.post( "/supervisor/options", json={"debug": True, "addons_repositories": [REPO_URL]}, ) assert response.status == 400 assert REPO_URL not in coresys.store.repository_urls assert coresys.config.debug coresys.updater.save_data.assert_called_once() coresys.config.save_data.assert_called_once() async def test_api_supervisor_options_auto_update( api_client: TestClient, coresys: CoreSys ): """Test disabling auto update via api.""" assert coresys.updater.auto_update is True response = await api_client.post("/supervisor/options", json={"auto_update": False}) assert response.status == 200 assert coresys.updater.auto_update is False async def test_api_supervisor_options_diagnostics( api_client: TestClient, coresys: CoreSys, os_agent_services: dict[str, DBusServiceMock | dict[str, DBusServiceMock]], ): """Test changing diagnostics.""" os_agent_service: OSAgentService = os_agent_services["os_agent"] os_agent_service.Diagnostics = False await os_agent_service.ping() assert coresys.dbus.agent.diagnostics is False with patch("supervisor.utils.sentry.sentry_sdk.init") as sentry_init: response = await api_client.post( "/supervisor/options", json={"diagnostics": True} ) assert response.status == 200 sentry_init.assert_called_once() await os_agent_service.ping() assert coresys.dbus.agent.diagnostics is True with patch("supervisor.api.supervisor.close_sentry") as close_sentry: response = await api_client.post( "/supervisor/options", json={"diagnostics": False} ) assert response.status == 200 close_sentry.assert_called_once() await os_agent_service.ping() assert coresys.dbus.agent.diagnostics is False async def test_api_supervisor_logs( api_client: TestClient, journald_logs: MagicMock, coresys: CoreSys, os_available ): """Test supervisor logs.""" await common_test_api_advanced_logs( "/supervisor", "hassio_supervisor", api_client, journald_logs, coresys, os_available, ) async def test_api_supervisor_fallback( api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock ): """Check that supervisor logs read from container logs if reading from journald gateway fails badly.""" journald_logs.side_effect = HassioError("Something bad happened!") with patch("supervisor.api._LOGGER.exception") as logger: resp = await api_client.get("/supervisor/logs") logger.assert_called_once_with( "Failed to get supervisor logs using advanced_logs API" ) assert resp.status == 200 assert resp.content_type == "text/plain" content = await resp.read() assert content.split(b"\n")[0:2] == [ b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os\x1b[0m", b"\x1b[36m22-10-11 14:04:23 DEBUG (MainThread) [supervisor.utils.dbus] D-Bus call - org.freedesktop.DBus.Properties.call_get_all on /io/hass/os/AppArmor\x1b[0m", ] # check fallback also works for the /follow endpoint (no mock reset needed) with patch("supervisor.api._LOGGER.exception") as logger: resp = await api_client.get("/supervisor/logs/follow") logger.assert_called_once_with( "Failed to get supervisor logs using advanced_logs API" ) assert resp.status == 200 assert resp.content_type == "text/plain" # check the /latest endpoint as well with patch("supervisor.api._LOGGER.exception") as logger: resp = await api_client.get("/supervisor/logs/latest") logger.assert_called_once_with( "Failed to get supervisor logs using advanced_logs API" ) assert resp.status == 200 assert resp.content_type == "text/plain" # also check generic Python error journald_logs.side_effect = OSError("Something bad happened!") with patch("supervisor.api._LOGGER.exception") as logger: resp = await api_client.get("/supervisor/logs") logger.assert_called_once_with( "Failed to get supervisor logs using advanced_logs API" ) assert resp.status == 200 assert resp.content_type == "text/plain" async def test_api_supervisor_fallback_log_capture( api_client: TestClient, journald_logs: MagicMock, docker_logs: MagicMock ): """Check that Sentry log capture is executed only for unexpected errors.""" journald_logs.side_effect = HostNotSupportedError( "No systemd-journal-gatewayd Unix socket available!" ) with patch("supervisor.api.async_capture_exception") as capture_exception: await api_client.get("/supervisor/logs") capture_exception.assert_not_called() journald_logs.reset_mock() journald_logs.side_effect = HassioError("Something bad happened!") with patch("supervisor.api.async_capture_exception") as capture_exception: await api_client.get("/supervisor/logs") capture_exception.assert_called_once() async def test_api_supervisor_reload( api_client: TestClient, supervisor_internet: AsyncMock, websession: MagicMock ): """Test supervisor reload.""" resp = await api_client.post("/supervisor/reload") assert resp.status == 200 assert await resp.json() == {"result": "ok", "data": {}} async def test_api_supervisor_options_timezone( api_client: TestClient, coresys: CoreSys ): """Test setting supervisor timezone via API.""" assert coresys.timezone == "Etc/UTC" resp = await api_client.post( "/supervisor/options", json={"timezone": "Europe/Zurich"} ) assert resp.status == 200 assert coresys.timezone == "Europe/Zurich" async def test_api_supervisor_options_country(api_client: TestClient, coresys: CoreSys): """Test setting supervisor country via API.""" assert coresys.config.country is None resp = await api_client.post("/supervisor/options", json={"country": "CH"}) assert resp.status == 200 assert coresys.config.country == "CH" resp = await api_client.get("/supervisor/info") assert resp.status == 200 body = await resp.json() assert body["data"]["country"] == "CH" @pytest.mark.parametrize( ("blockbuster", "option_value", "config_value"), [("no_blockbuster", "on", False), ("no_blockbuster", "on-at-startup", True)], indirect=["blockbuster"], ) async def test_api_supervisor_options_blocking_io( api_client: TestClient, coresys: CoreSys, option_value: str, config_value: bool ): """Test setting supervisor detect blocking io option.""" # This should not fail with a blocking error yet time.sleep(0) resp = await api_client.post( "/supervisor/options", json={"detect_blocking_io": option_value} ) assert resp.status == 200 resp = await api_client.get("/supervisor/info") assert resp.status == 200 body = await resp.json() assert body["data"]["detect_blocking_io"] is True # This remains false because we only turned it on for current run of supervisor, not permanently assert coresys.config.detect_blocking_io is config_value with pytest.raises(BlockingError): time.sleep(0) resp = await api_client.post( "/supervisor/options", json={"detect_blocking_io": "off"} ) assert resp.status == 200 resp = await api_client.get("/supervisor/info") assert resp.status == 200 body = await resp.json() assert body["data"]["detect_blocking_io"] is False assert coresys.config.detect_blocking_io is False # This should not raise blocking error anymore time.sleep(0) @pytest.mark.usefixtures("tmp_supervisor_data") async def test_api_progress_updates_supervisor_update( api_client: TestClient, coresys: CoreSys, ha_ws_client: AsyncMock ): """Test progress updates sent to Home Assistant for updates.""" coresys.hardware.disk.get_disk_free_space = lambda x: 5000 coresys.core.set_state(CoreState.RUNNING) coresys.docker.docker.api.pull.return_value = load_json_fixture( "docker_pull_image_log.json" ) with ( patch.object( Supervisor, "version", new=PropertyMock(return_value=AwesomeVersion("2025.08.0")), ), patch.object( Updater, "version_supervisor", new=PropertyMock(return_value=AwesomeVersion("2025.08.3")), ), patch.object( Updater, "image_supervisor", new=PropertyMock(return_value="supervisor") ), patch.object(Supervisor, "update_apparmor"), patch.object(Core, "stop"), ): resp = await api_client.post("/supervisor/update") assert resp.status == 200 events = [ { "stage": evt.args[0]["data"]["data"]["stage"], "progress": evt.args[0]["data"]["data"]["progress"], "done": evt.args[0]["data"]["data"]["done"], } for evt in ha_ws_client.async_send_command.call_args_list if "data" in evt.args[0] and evt.args[0]["data"]["event"] == WSEvent.JOB and evt.args[0]["data"]["data"]["name"] == "supervisor_update" ] assert events[:4] == [ { "stage": None, "progress": 0, "done": False, }, { "stage": None, "progress": 0.1, "done": False, }, { "stage": None, "progress": 1.2, "done": False, }, { "stage": None, "progress": 2.8, "done": False, }, ] assert events[-5:] == [ { "stage": None, "progress": 97.2, "done": False, }, { "stage": None, "progress": 98.4, "done": False, }, { "stage": None, "progress": 99.4, "done": False, }, { "stage": None, "progress": 100, "done": False, }, { "stage": None, "progress": 100, "done": True, }, ]