"""Test OS API.""" import asyncio from pathlib import Path from unittest.mock import PropertyMock, patch from aiohttp.test_utils import TestClient import pytest from supervisor.coresys import CoreSys from supervisor.dbus.agent.boards import BoardManager from supervisor.hardware.data import Device from supervisor.os.manager import OSManager from supervisor.resolution.const import ContextType, IssueType, SuggestionType from supervisor.resolution.data import Issue, Suggestion # pylint: disable=protected-access @pytest.mark.asyncio async def test_api_os_info(api_client: TestClient): """Test docker info api.""" resp = await api_client.get("/os/info") result = await resp.json() for attr in ( "version", "version_latest", "update_available", "board", "boot", "data_disk", ): assert attr in result["data"] @pytest.mark.asyncio async def test_api_os_info_with_agent(api_client: TestClient, coresys: CoreSys): """Test docker info api.""" await coresys.dbus.agent.connect(coresys.dbus.bus) await coresys.dbus.agent.update() resp = await api_client.get("/os/info") result = await resp.json() assert result["data"]["data_disk"] == "/dev/sda" @pytest.mark.asyncio async def test_api_os_datadisk_move(api_client: TestClient, coresys: CoreSys): """Test datadisk move without exists disk.""" await coresys.dbus.agent.connect(coresys.dbus.bus) await coresys.dbus.agent.update() coresys.os._available = True resp = await api_client.post("/os/datadisk/move", json={"device": "/dev/sdaaaa"}) result = await resp.json() assert result["message"] == "'/dev/sdaaaa' don't exists on the host!" @pytest.mark.asyncio async def test_api_os_datadisk_list(api_client: TestClient, coresys: CoreSys): """Test datadisk list function.""" await coresys.dbus.agent.connect(coresys.dbus.bus) await coresys.dbus.agent.update() coresys.hardware.update_device( Device( "sda", Path("/dev/sda"), Path("/sys/bus/usb/000"), "block", None, [Path("/dev/serial/by-id/test")], {"ID_NAME": "xy", "MINOR": "0", "DEVTYPE": "disk"}, [], ) ) coresys.hardware.update_device( Device( "sda1", Path("/dev/sda1"), Path("/sys/bus/usb/000/1"), "block", None, [Path("/dev/serial/by-id/test1")], {"ID_NAME": "xy", "MINOR": "1", "DEVTYPE": "partition"}, [], ) ) resp = await api_client.get("/os/datadisk/list") result = await resp.json() assert result["data"]["devices"] == ["/dev/sda"] async def test_api_board_yellow_info(api_client: TestClient, coresys: CoreSys): """Test yellow board info.""" await coresys.dbus.agent.board.connect(coresys.dbus.bus) resp = await api_client.get("/os/boards/yellow") assert resp.status == 200 result = await resp.json() assert result["data"]["disk_led"] is True assert result["data"]["heartbeat_led"] is True assert result["data"]["power_led"] is True assert (await api_client.get("/os/boards/supervised")).status == 400 assert (await api_client.get("/os/boards/not-real")).status == 400 async def test_api_board_yellow_options( api_client: TestClient, coresys: CoreSys, dbus: list[str] ): """Test yellow board options.""" await coresys.dbus.agent.board.connect(coresys.dbus.bus) assert len(coresys.resolution.issues) == 0 dbus.clear() resp = await api_client.post( "/os/boards/yellow", json={"disk_led": False, "heartbeat_led": False, "power_led": False}, ) assert resp.status == 200 await asyncio.sleep(0) assert dbus == [ "/io/hass/os/Boards/Yellow-io.hass.os.Boards.Yellow.DiskLED", "/io/hass/os/Boards/Yellow-io.hass.os.Boards.Yellow.HeartbeatLED", "/io/hass/os/Boards/Yellow-io.hass.os.Boards.Yellow.PowerLED", ] assert ( Issue(IssueType.REBOOT_REQUIRED, ContextType.SYSTEM) in coresys.resolution.issues ) assert ( Suggestion(SuggestionType.EXECUTE_REBOOT, ContextType.SYSTEM) in coresys.resolution.suggestions ) async def test_api_board_supervised_info(api_client: TestClient, coresys: CoreSys): """Test supervised board info.""" with patch( "supervisor.os.manager.CPE.get_product", return_value=["not-hassos"] ), patch.object(BoardManager, "board", new=PropertyMock(return_value="Supervised")): await coresys.dbus.agent.board.connect(coresys.dbus.bus) await coresys.dbus.hostname.connect(coresys.dbus.bus) await coresys.os.load() assert (await api_client.get("/os/boards/supervised")).status == 200 assert (await api_client.post("/os/boards/supervised", json={})).status == 405 assert (await api_client.get("/os/boards/yellow")).status == 400 assert (await api_client.get("/os/boards/not-real")).status == 400 async def test_api_board_other_info(api_client: TestClient, coresys: CoreSys): """Test info for other board without dbus object.""" with patch.object( BoardManager, "board", new=PropertyMock(return_value="not-real") ), patch.object(OSManager, "board", new=PropertyMock(return_value="not-real")): await coresys.dbus.agent.board.connect(coresys.dbus.bus) assert (await api_client.get("/os/boards/not-real")).status == 200 assert (await api_client.post("/os/boards/not-real", json={})).status == 405 assert (await api_client.get("/os/boards/yellow")).status == 400 assert (await api_client.get("/os/boards/supervised")).status == 400