System Bridge coordinator and connector refactor (#114896)

* Update systembridgeconnector to 5.0.0.dev2

* Refactor

* Move out of single use method

* Update systembridgeconnector to 4.1.0.dev0 and systembridgemodels to 4.1.0

* Refactor WebSocket connection handling in SystemBridgeDataUpdateCoordinator

* Remove unnessasary fluff

* Update system_bridge requirements to version 4.1.0.dev1

* Set systembridgeconnector to 4.1.0

* Fix config flow tests

We'll make this better later

* Add missing tests for media source

* Update config flow tests

* Add missing check

* Refactor WebSocket connection handling in SystemBridgeDataUpdateCoordinator

* Move inside try

* Move log

* Cleanup log

* Fix disconnection update

* Set unregistered on disconnect

* Remove bool, use listener task

* Fix eager start

* == -> is

* Reduce errors

* Update test
This commit is contained in:
Aidan Timson 2024-07-17 17:39:24 +01:00 committed by GitHub
parent 843fae825f
commit 52b90621c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 612 additions and 267 deletions

View File

@ -55,7 +55,7 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from .config_flow import SystemBridgeConfigFlow
from .const import DOMAIN, MODULES
from .const import DATA_WAIT_TIMEOUT, DOMAIN, MODULES
from .coordinator import SystemBridgeDataUpdateCoordinator
_LOGGER = logging.getLogger(__name__)
@ -105,7 +105,7 @@ async def async_setup_entry(
)
supported = False
try:
async with asyncio.timeout(10):
async with asyncio.timeout(DATA_WAIT_TIMEOUT):
supported = await version.check_supported()
except AuthenticationException as exception:
_LOGGER.error("Authentication failed for %s: %s", entry.title, exception)
@ -161,8 +161,9 @@ async def async_setup_entry(
_LOGGER,
entry=entry,
)
try:
async with asyncio.timeout(10):
async with asyncio.timeout(DATA_WAIT_TIMEOUT):
await coordinator.async_get_data(MODULES)
except AuthenticationException as exception:
_LOGGER.error("Authentication failed for %s: %s", entry.title, exception)
@ -196,26 +197,6 @@ async def async_setup_entry(
# Fetch initial data so we have data when entities subscribe
await coordinator.async_config_entry_first_refresh()
try:
# Wait for initial data
async with asyncio.timeout(10):
while not coordinator.is_ready:
_LOGGER.debug(
"Waiting for initial data from %s (%s)",
entry.title,
entry.data[CONF_HOST],
)
await asyncio.sleep(1)
except TimeoutError as exception:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="timeout",
translation_placeholders={
"title": entry.title,
"host": entry.data[CONF_HOST],
},
) from exception
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator
@ -296,11 +277,11 @@ async def async_setup_entry(
coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
service_call.data[CONF_BRIDGE]
]
processes: list[Process] = coordinator.data.processes
# Find processes from list
items: list[dict[str, Any]] = [
asdict(process)
for process in processes
for process in coordinator.data.processes
if process.name is not None
and service_call.data[CONF_NAME].lower() in process.name.lower()
]

View File

@ -13,7 +13,7 @@ from systembridgeconnector.exceptions import (
ConnectionErrorException,
)
from systembridgeconnector.websocket_client import WebSocketClient
from systembridgemodels.modules import GetData
from systembridgemodels.modules import GetData, Module
import voluptuous as vol
from homeassistant.components import zeroconf
@ -24,8 +24,7 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
from .data import SystemBridgeData
from .const import DATA_WAIT_TIMEOUT, DOMAIN
_LOGGER = logging.getLogger(__name__)
@ -48,16 +47,6 @@ async def _validate_input(
Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user.
"""
system_bridge_data = SystemBridgeData()
async def _async_handle_module(
module_name: str,
module: Any,
) -> None:
"""Handle data from the WebSocket client."""
_LOGGER.debug("Set new data for: %s", module_name)
setattr(system_bridge_data, module_name, module)
websocket_client = WebSocketClient(
data[CONF_HOST],
data[CONF_PORT],
@ -66,17 +55,11 @@ async def _validate_input(
)
try:
async with asyncio.timeout(15):
async with asyncio.timeout(DATA_WAIT_TIMEOUT):
await websocket_client.connect()
hass.async_create_task(
websocket_client.listen(callback=_async_handle_module)
modules_data = await websocket_client.get_data(
GetData(modules=[Module.SYSTEM])
)
response = await websocket_client.get_data(GetData(modules=["system"]))
_LOGGER.debug("Got response: %s", response)
if response is None:
raise CannotConnect("No data received")
while system_bridge_data.system is None:
await asyncio.sleep(0.2)
except AuthenticationException as exception:
_LOGGER.warning(
"Authentication error when connecting to %s: %s",
@ -98,9 +81,13 @@ async def _validate_input(
except ValueError as exception:
raise CannotConnect from exception
_LOGGER.debug("Got System data: %s", system_bridge_data.system)
_LOGGER.debug("Got modules data: %s", modules_data)
if modules_data is None or modules_data.system is None:
raise CannotConnect("No system data received")
return {"hostname": data[CONF_HOST], "uuid": system_bridge_data.system.uuid}
_LOGGER.debug("Got System data: %s", modules_data.system)
return {"hostname": data[CONF_HOST], "uuid": modules_data.system.uuid}
async def _async_get_info(

View File

@ -1,15 +1,21 @@
"""Constants for the System Bridge integration."""
from typing import Final
from systembridgemodels.modules import Module
DOMAIN = "system_bridge"
MODULES = [
"battery",
"cpu",
"disks",
"displays",
"gpus",
"media",
"memory",
"processes",
"system",
MODULES: Final[list[Module]] = [
Module.BATTERY,
Module.CPU,
Module.DISKS,
Module.DISPLAYS,
Module.GPUS,
Module.MEDIA,
Module.MEMORY,
Module.PROCESSES,
Module.SYSTEM,
]
DATA_WAIT_TIMEOUT: Final[int] = 10

View File

@ -2,7 +2,7 @@
from __future__ import annotations
import asyncio
from asyncio import Task
from collections.abc import Callable
from datetime import timedelta
import logging
@ -14,12 +14,12 @@ from systembridgeconnector.exceptions import (
ConnectionErrorException,
)
from systembridgeconnector.websocket_client import WebSocketClient
from systembridgemodels.media_directories import MediaDirectory
from systembridgemodels.media_files import MediaFile, MediaFiles
from systembridgemodels.media_get_file import MediaGetFile
from systembridgemodels.media_get_files import MediaGetFiles
from systembridgemodels.modules import GetData, RegisterDataListener
from systembridgemodels.response import Response
from systembridgemodels.modules import (
GetData,
Module,
ModulesData,
RegisterDataListener,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@ -51,12 +51,13 @@ class SystemBridgeDataUpdateCoordinator(DataUpdateCoordinator[SystemBridgeData])
self.title = entry.title
self.unsub: Callable | None = None
self.systembridge_data = SystemBridgeData()
self.listen_task: Task | None = None
self.websocket_client = WebSocketClient(
entry.data[CONF_HOST],
entry.data[CONF_PORT],
entry.data[CONF_TOKEN],
api_host=entry.data[CONF_HOST],
api_port=entry.data[CONF_PORT],
token=entry.data[CONF_TOKEN],
session=async_get_clientsession(hass),
can_close_session=False,
)
self._host = entry.data[CONF_HOST]
@ -68,56 +69,62 @@ class SystemBridgeDataUpdateCoordinator(DataUpdateCoordinator[SystemBridgeData])
update_interval=timedelta(seconds=30),
)
@property
def is_ready(self) -> bool:
"""Return if the data is ready."""
if self.data is None:
return False
for module in MODULES:
if getattr(self.data, module) is None:
self.logger.debug("%s - Module %s is None", self.title, module)
return False
return True
self.data = SystemBridgeData()
async def check_websocket_connected(self) -> None:
"""Check if WebSocket is connected."""
self.logger.debug(
"[check_websocket_connected] WebSocket connected: %s",
self.websocket_client.connected,
)
if not self.websocket_client.connected:
try:
await self.websocket_client.connect()
except ConnectionErrorException as exception:
self.logger.warning(
"[check_websocket_connected] Connection error occurred for %s: %s",
self.title,
exception,
)
await self.clean_disconnect()
async def close_websocket(self) -> None:
"""Close WebSocket connection."""
await self.websocket_client.close()
if self.listen_task is not None:
self.listen_task.cancel(
msg="WebSocket closed on Home Assistant shutdown",
)
async def clean_disconnect(self) -> None:
"""Clean disconnect WebSocket."""
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
if self.listen_task is not None:
self.listen_task.cancel(
msg="WebSocket disconnected",
)
async def async_get_data(
self,
modules: list[str],
) -> Response:
modules: list[Module],
) -> ModulesData:
"""Get data from WebSocket."""
if not self.websocket_client.connected:
await self._setup_websocket()
await self.check_websocket_connected()
return await self.websocket_client.get_data(GetData(modules=modules))
modules_data = await self.websocket_client.get_data(GetData(modules=modules))
async def async_get_media_directories(self) -> list[MediaDirectory]:
"""Get media directories."""
return await self.websocket_client.get_directories()
# Merge new data with existing data
for module in MODULES:
if hasattr(modules_data, module):
self.logger.debug("[async_get_data] Set new data for: %s", module)
setattr(self.data, module, getattr(modules_data, module))
async def async_get_media_files(
self,
base: str,
path: str | None = None,
) -> MediaFiles:
"""Get media files."""
return await self.websocket_client.get_files(
MediaGetFiles(
base=base,
path=path,
)
)
async def async_get_media_file(
self,
base: str,
path: str,
) -> MediaFile | None:
"""Get media file."""
return await self.websocket_client.get_file(
MediaGetFile(
base=base,
path=path,
)
)
return modules_data
async def async_handle_module(
self,
@ -125,117 +132,79 @@ class SystemBridgeDataUpdateCoordinator(DataUpdateCoordinator[SystemBridgeData])
module: Any,
) -> None:
"""Handle data from the WebSocket client."""
self.logger.debug("Set new data for: %s", module_name)
setattr(self.systembridge_data, module_name, module)
self.async_set_updated_data(self.systembridge_data)
self.logger.debug("[async_handle_module] Set new data for: %s", module_name)
setattr(self.data, module_name, module)
self.async_set_updated_data(self.data)
async def _listen_for_data(self) -> None:
"""Listen for events from the WebSocket."""
try:
await self.websocket_client.listen(callback=self.async_handle_module)
except AuthenticationException as exception:
self.last_update_success = False
self.logger.error(
"Authentication failed while listening for %s: %s",
self.title,
exception,
)
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
await self.clean_disconnect()
except (ConnectionClosedException, ConnectionResetError) as exception:
self.logger.debug(
"Websocket connection closed for %s. Will retry: %s",
"[_listen_for_data] Websocket connection closed for %s: %s",
self.title,
exception,
)
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
await self.clean_disconnect()
except ConnectionErrorException as exception:
self.logger.debug(
"Connection error occurred for %s. Will retry: %s",
"[_listen_for_data] Connection error occurred for %s: %s",
self.title,
exception,
)
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
async def _setup_websocket(self) -> None:
"""Use WebSocket for updates."""
try:
async with asyncio.timeout(20):
await self.websocket_client.connect()
self.hass.async_create_background_task(
self._listen_for_data(),
name="System Bridge WebSocket Listener",
)
await self.websocket_client.register_data_listener(
RegisterDataListener(modules=MODULES)
)
self.last_update_success = True
self.async_update_listeners()
except AuthenticationException as exception:
self.logger.error(
"Authentication failed at setup for %s: %s", self.title, exception
)
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="authentication_failed",
translation_placeholders={
"title": self.title,
"host": self._host,
},
) from exception
except ConnectionErrorException as exception:
self.logger.warning(
"Connection error occurred for %s. Will retry: %s",
self.title,
exception,
)
self.last_update_success = False
self.async_update_listeners()
except TimeoutError as exception:
self.logger.warning(
"Timed out waiting for %s. Will retry: %s",
self.title,
exception,
)
self.last_update_success = False
self.async_update_listeners()
async def close_websocket(_) -> None:
"""Close WebSocket connection."""
await self.websocket_client.close()
# Clean disconnect WebSocket on Home Assistant shutdown
self.unsub = self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, close_websocket
)
await self.clean_disconnect()
async def _async_update_data(self) -> SystemBridgeData:
"""Update System Bridge data from WebSocket."""
self.logger.debug(
"_async_update_data - WebSocket Connected: %s",
self.websocket_client.connected,
)
if not self.websocket_client.connected:
await self._setup_websocket()
if self.listen_task is None or not self.websocket_client.connected:
await self.check_websocket_connected()
self.logger.debug("_async_update_data done")
self.logger.debug("Create listener task for %s", self.title)
self.listen_task = self.hass.async_create_background_task(
self._listen_for_data(),
name="System Bridge WebSocket Listener",
eager_start=False,
)
self.logger.debug("Listening for data from %s", self.title)
return self.systembridge_data
try:
await self.websocket_client.register_data_listener(
RegisterDataListener(modules=MODULES)
)
except AuthenticationException as exception:
self.logger.error(
"Authentication failed at setup for %s: %s", self.title, exception
)
await self.clean_disconnect()
raise ConfigEntryAuthFailed from exception
except (ConnectionClosedException, ConnectionErrorException) as exception:
self.logger.warning(
"[register] Connection error occurred for %s: %s",
self.title,
exception,
)
await self.clean_disconnect()
return self.data
self.logger.debug("Registered data listener for %s", self.title)
self.last_update_success = True
self.async_update_listeners()
# Clean disconnect WebSocket on Home Assistant shutdown
self.unsub = self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP,
lambda _: self.close_websocket(),
)
self.logger.debug("[_async_update_data] Done")
return self.data

View File

@ -10,6 +10,6 @@
"iot_class": "local_push",
"loggers": ["systembridgeconnector"],
"quality_scale": "silver",
"requirements": ["systembridgeconnector==4.0.3", "systembridgemodels==4.0.4"],
"requirements": ["systembridgeconnector==4.1.0", "systembridgemodels==4.1.0"],
"zeroconf": ["_system-bridge._tcp.local."]
}

View File

@ -4,6 +4,7 @@ from __future__ import annotations
from systembridgemodels.media_directories import MediaDirectory
from systembridgemodels.media_files import MediaFile, MediaFiles
from systembridgemodels.media_get_files import MediaGetFiles
from homeassistant.components.media_player import MediaClass
from homeassistant.components.media_source import MEDIA_CLASS_MAP, MEDIA_MIME_TYPES
@ -68,7 +69,7 @@ class SystemBridgeSource(MediaSource):
coordinator: SystemBridgeDataUpdateCoordinator = self.hass.data[DOMAIN].get(
entry.entry_id
)
directories = await coordinator.async_get_media_directories()
directories = await coordinator.websocket_client.get_directories()
return _build_root_paths(entry, directories)
entry_id, path = item.identifier.split("~~", 1)
@ -80,8 +81,11 @@ class SystemBridgeSource(MediaSource):
path_split = path.split("/", 1)
files = await coordinator.async_get_media_files(
path_split[0], path_split[1] if len(path_split) > 1 else None
files = await coordinator.websocket_client.get_files(
MediaGetFiles(
base=path_split[0],
path=path_split[1] if len(path_split) > 1 else None,
)
)
return _build_media_items(entry, files, path, item.identifier)

View File

@ -2679,10 +2679,10 @@ switchbot-api==2.2.1
synology-srm==0.2.0
# homeassistant.components.system_bridge
systembridgeconnector==4.0.3
systembridgeconnector==4.1.0
# homeassistant.components.system_bridge
systembridgemodels==4.0.4
systembridgemodels==4.1.0
# homeassistant.components.tailscale
tailscale==0.6.1

View File

@ -2104,10 +2104,10 @@ surepy==0.9.0
switchbot-api==2.2.1
# homeassistant.components.system_bridge
systembridgeconnector==4.0.3
systembridgeconnector==4.1.0
# homeassistant.components.system_bridge
systembridgemodels==4.0.4
systembridgemodels==4.1.0
# homeassistant.components.tailscale
tailscale==0.6.1

View File

@ -1,38 +1,52 @@
"""Tests for the System Bridge integration."""
from collections.abc import Awaitable, Callable
from dataclasses import asdict
from ipaddress import ip_address
from typing import Any
from systembridgeconnector.const import TYPE_DATA_UPDATE
from systembridgemodels.const import MODEL_SYSTEM
from systembridgemodels.modules import System
from systembridgemodels.response import Response
from systembridgemodels.fixtures.modules.battery import FIXTURE_BATTERY
from systembridgemodels.fixtures.modules.cpu import FIXTURE_CPU
from systembridgemodels.fixtures.modules.disks import FIXTURE_DISKS
from systembridgemodels.fixtures.modules.displays import FIXTURE_DISPLAYS
from systembridgemodels.fixtures.modules.gpus import FIXTURE_GPUS
from systembridgemodels.fixtures.modules.media import FIXTURE_MEDIA
from systembridgemodels.fixtures.modules.memory import FIXTURE_MEMORY
from systembridgemodels.fixtures.modules.processes import FIXTURE_PROCESSES
from systembridgemodels.fixtures.modules.system import FIXTURE_SYSTEM
from systembridgemodels.modules import Module, ModulesData
from homeassistant.components import zeroconf
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN
from homeassistant.core import HomeAssistant
FIXTURE_MAC_ADDRESS = "aa:bb:cc:dd:ee:ff"
FIXTURE_UUID = "e91bf575-56f3-4c83-8f42-70ac17adcd33"
from tests.common import MockConfigEntry
FIXTURE_AUTH_INPUT = {CONF_TOKEN: "abc-123-def-456-ghi"}
FIXTURE_TITLE = "TestSystem"
FIXTURE_REQUEST_ID = "test"
FIXTURE_MAC_ADDRESS = FIXTURE_SYSTEM.mac_address
FIXTURE_UUID = FIXTURE_SYSTEM.uuid
FIXTURE_AUTH_INPUT = {
CONF_TOKEN: "abc-123-def-456-ghi",
}
FIXTURE_USER_INPUT = {
CONF_TOKEN: "abc-123-def-456-ghi",
CONF_HOST: "test-bridge",
CONF_HOST: "127.0.0.1",
CONF_PORT: "9170",
}
FIXTURE_ZEROCONF_INPUT = {
CONF_TOKEN: "abc-123-def-456-ghi",
CONF_HOST: "1.1.1.1",
CONF_HOST: FIXTURE_USER_INPUT[CONF_HOST],
CONF_PORT: "9170",
}
FIXTURE_ZEROCONF = zeroconf.ZeroconfServiceInfo(
ip_address=ip_address("1.1.1.1"),
ip_addresses=[ip_address("1.1.1.1")],
ip_address=ip_address(FIXTURE_USER_INPUT[CONF_HOST]),
ip_addresses=[ip_address(FIXTURE_USER_INPUT[CONF_HOST])],
port=9170,
hostname="test-bridge.local.",
type="_system-bridge._tcp.local.",
@ -41,7 +55,7 @@ FIXTURE_ZEROCONF = zeroconf.ZeroconfServiceInfo(
"address": "http://test-bridge:9170",
"fqdn": "test-bridge",
"host": "test-bridge",
"ip": "1.1.1.1",
"ip": FIXTURE_USER_INPUT[CONF_HOST],
"mac": FIXTURE_MAC_ADDRESS,
"port": "9170",
"uuid": FIXTURE_UUID,
@ -49,8 +63,8 @@ FIXTURE_ZEROCONF = zeroconf.ZeroconfServiceInfo(
)
FIXTURE_ZEROCONF_BAD = zeroconf.ZeroconfServiceInfo(
ip_address=ip_address("1.1.1.1"),
ip_addresses=[ip_address("1.1.1.1")],
ip_address=ip_address(FIXTURE_USER_INPUT[CONF_HOST]),
ip_addresses=[ip_address(FIXTURE_USER_INPUT[CONF_HOST])],
port=9170,
hostname="test-bridge.local.",
type="_system-bridge._tcp.local.",
@ -60,57 +74,37 @@ FIXTURE_ZEROCONF_BAD = zeroconf.ZeroconfServiceInfo(
},
)
FIXTURE_SYSTEM = System(
boot_time=1,
fqdn="",
hostname="1.1.1.1",
ip_address_4="1.1.1.1",
mac_address=FIXTURE_MAC_ADDRESS,
platform="",
platform_version="",
uptime=1,
uuid=FIXTURE_UUID,
version="",
version_latest="",
version_newer_available=False,
users=[],
FIXTURE_DATA_RESPONSE = ModulesData(
system=FIXTURE_SYSTEM,
)
FIXTURE_DATA_RESPONSE = Response(
id="1234",
type=TYPE_DATA_UPDATE,
subtype=None,
message="Data received",
module=MODEL_SYSTEM,
data=asdict(FIXTURE_SYSTEM),
)
FIXTURE_DATA_RESPONSE_BAD = Response(
id="1234",
type=TYPE_DATA_UPDATE,
subtype=None,
message="Data received",
module=MODEL_SYSTEM,
data={},
)
async def setup_integration(
hass: HomeAssistant,
config_entry: MockConfigEntry,
) -> bool:
"""Fixture for setting up the component."""
config_entry.add_to_hass(hass)
FIXTURE_DATA_RESPONSE_BAD = Response(
id="1234",
type=TYPE_DATA_UPDATE,
subtype=None,
message="Data received",
module=MODEL_SYSTEM,
data={},
)
setup_result = await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
return setup_result
async def mock_data_listener(
self,
callback: Callable[[str, Any], Awaitable[None]] | None = None,
_: bool = False,
):
"""Mock websocket data listener."""
if callback is not None:
# Simulate data received from the websocket
await callback(MODEL_SYSTEM, FIXTURE_SYSTEM)
await callback(Module.BATTERY, FIXTURE_BATTERY)
await callback(Module.CPU, FIXTURE_CPU)
await callback(Module.DISKS, FIXTURE_DISKS)
await callback(Module.DISPLAYS, FIXTURE_DISPLAYS)
await callback(Module.GPUS, FIXTURE_GPUS)
await callback(Module.MEDIA, FIXTURE_MEDIA)
await callback(Module.MEMORY, FIXTURE_MEMORY)
await callback(Module.PROCESSES, FIXTURE_PROCESSES)
await callback(Module.SYSTEM, FIXTURE_SYSTEM)

View File

@ -0,0 +1,195 @@
"""Fixtures for System Bridge integration tests."""
from __future__ import annotations
from collections.abc import Generator
from typing import Final
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from systembridgeconnector.const import EventKey, EventType
from systembridgemodels.fixtures.modules.battery import FIXTURE_BATTERY
from systembridgemodels.fixtures.modules.cpu import FIXTURE_CPU
from systembridgemodels.fixtures.modules.disks import FIXTURE_DISKS
from systembridgemodels.fixtures.modules.displays import FIXTURE_DISPLAYS
from systembridgemodels.fixtures.modules.gpus import FIXTURE_GPUS
from systembridgemodels.fixtures.modules.media import FIXTURE_MEDIA
from systembridgemodels.fixtures.modules.memory import FIXTURE_MEMORY
from systembridgemodels.fixtures.modules.networks import FIXTURE_NETWORKS
from systembridgemodels.fixtures.modules.processes import FIXTURE_PROCESSES
from systembridgemodels.fixtures.modules.sensors import FIXTURE_SENSORS
from systembridgemodels.fixtures.modules.system import FIXTURE_SYSTEM
from systembridgemodels.media_directories import MediaDirectory
from systembridgemodels.media_files import MediaFile, MediaFiles
from systembridgemodels.modules import Module, ModulesData, RegisterDataListener
from systembridgemodels.response import Response
from homeassistant.components.system_bridge.config_flow import SystemBridgeConfigFlow
from homeassistant.components.system_bridge.const import DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN
from homeassistant.core import HomeAssistant
from . import (
FIXTURE_REQUEST_ID,
FIXTURE_TITLE,
FIXTURE_USER_INPUT,
FIXTURE_UUID,
mock_data_listener,
setup_integration,
)
from tests.common import MockConfigEntry
REGISTER_MODULES: Final[list[Module]] = [
Module.SYSTEM,
]
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Mock ConfigEntry."""
return MockConfigEntry(
title=FIXTURE_TITLE,
domain=DOMAIN,
unique_id=FIXTURE_UUID,
version=SystemBridgeConfigFlow.VERSION,
minor_version=SystemBridgeConfigFlow.MINOR_VERSION,
data={
CONF_HOST: FIXTURE_USER_INPUT[CONF_HOST],
CONF_PORT: FIXTURE_USER_INPUT[CONF_PORT],
CONF_TOKEN: FIXTURE_USER_INPUT[CONF_TOKEN],
},
)
@pytest.fixture(autouse=True)
def mock_setup_notify_platform() -> Generator[AsyncMock, None, None]:
"""Mock notify platform setup."""
with patch(
"homeassistant.helpers.discovery.async_load_platform",
) as mock_setup_notify_platform:
yield mock_setup_notify_platform
@pytest.fixture
def mock_version() -> Generator[AsyncMock, None, None]:
"""Return a mocked Version class."""
with patch(
"homeassistant.components.system_bridge.Version",
autospec=True,
) as mock_version:
version = mock_version.return_value
version.check_supported.return_value = True
yield version
@pytest.fixture
def mock_websocket_client(
register_data_listener_model: RegisterDataListener = RegisterDataListener(
modules=REGISTER_MODULES,
),
) -> Generator[MagicMock, None, None]:
"""Return a mocked WebSocketClient client."""
with (
patch(
"homeassistant.components.system_bridge.coordinator.WebSocketClient",
autospec=True,
) as mock_websocket_client,
patch(
"homeassistant.components.system_bridge.config_flow.WebSocketClient",
new=mock_websocket_client,
),
):
websocket_client = mock_websocket_client.return_value
websocket_client.connected = False
websocket_client.get_data.return_value = ModulesData(
battery=FIXTURE_BATTERY,
cpu=FIXTURE_CPU,
disks=FIXTURE_DISKS,
displays=FIXTURE_DISPLAYS,
gpus=FIXTURE_GPUS,
media=FIXTURE_MEDIA,
memory=FIXTURE_MEMORY,
networks=FIXTURE_NETWORKS,
processes=FIXTURE_PROCESSES,
sensors=FIXTURE_SENSORS,
system=FIXTURE_SYSTEM,
)
websocket_client.register_data_listener.return_value = Response(
id=FIXTURE_REQUEST_ID,
type=EventType.DATA_LISTENER_REGISTERED,
message="Data listener registered",
data={EventKey.MODULES: register_data_listener_model.modules},
)
# Trigger callback when listener is registered
websocket_client.listen.side_effect = mock_data_listener
websocket_client.get_directories.return_value = [
MediaDirectory(
key="documents",
path="/home/user/documents",
)
]
websocket_client.get_files.return_value = MediaFiles(
files=[
MediaFile(
name="testsubdirectory",
path="testsubdirectory",
fullpath="/home/user/documents/testsubdirectory",
size=100,
last_accessed=1630000000,
created=1630000000,
modified=1630000000,
is_directory=True,
is_file=False,
is_link=False,
),
MediaFile(
name="testfile.txt",
path="testfile.txt",
fullpath="/home/user/documents/testfile.txt",
size=100,
last_accessed=1630000000,
created=1630000000,
modified=1630000000,
is_directory=False,
is_file=True,
is_link=False,
mime_type="text/plain",
),
MediaFile(
name="testfile.jpg",
path="testfile.jpg",
fullpath="/home/user/documents/testimage.jpg",
size=100,
last_accessed=1630000000,
created=1630000000,
modified=1630000000,
is_directory=False,
is_file=True,
is_link=False,
mime_type="image/jpeg",
),
],
path="",
)
yield websocket_client
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_version: MagicMock,
mock_websocket_client: MagicMock,
) -> MockConfigEntry:
"""Initialize the System Bridge integration."""
assert await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.LOADED
return mock_config_entry

View File

@ -0,0 +1,61 @@
# serializer version: 1
# name: test_directory[system_bridge_media_source_directory]
dict({
'can_expand': True,
'can_play': False,
'children_media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'TestSystem - documents',
})
# ---
# name: test_entry[system_bridge_media_source_entry]
dict({
'can_expand': True,
'can_play': False,
'children_media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'TestSystem',
})
# ---
# name: test_file[system_bridge_media_source_file_image]
dict({
'mime_type': 'image/jpeg',
'url': 'http://127.0.0.1:9170/api/media/file/data?token=abc-123-def-456-ghi&base=documents&path=testimage.jpg',
})
# ---
# name: test_file[system_bridge_media_source_file_text]
dict({
'mime_type': 'text/plain',
'url': 'http://127.0.0.1:9170/api/media/file/data?token=abc-123-def-456-ghi&base=documents&path=testfile.txt',
})
# ---
# name: test_root[system_bridge_media_source_root]
dict({
'can_expand': True,
'can_play': False,
'children_media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'System Bridge',
})
# ---
# name: test_subdirectory[system_bridge_media_source_subdirectory]
dict({
'can_expand': True,
'can_play': False,
'children_media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_class': <MediaClass.DIRECTORY: 'directory'>,
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'TestSystem - documents/testsubdirectory',
})
# ---

View File

@ -69,7 +69,7 @@ async def test_user_flow(hass: HomeAssistant) -> None:
await hass.async_block_till_done()
assert result2["type"] is FlowResultType.CREATE_ENTRY
assert result2["title"] == "test-bridge"
assert result2["title"] == "127.0.0.1"
assert result2["data"] == FIXTURE_USER_INPUT
assert len(mock_setup_entry.mock_calls) == 1
@ -441,7 +441,7 @@ async def test_zeroconf_flow(hass: HomeAssistant) -> None:
await hass.async_block_till_done()
assert result2["type"] is FlowResultType.CREATE_ENTRY
assert result2["title"] == "1.1.1.1"
assert result2["title"] == "127.0.0.1"
assert result2["data"] == FIXTURE_ZEROCONF_INPUT
assert len(mock_setup_entry.mock_calls) == 1

View File

@ -0,0 +1,148 @@
"""Test the System Bridge integration."""
import pytest
from syrupy.assertion import SnapshotAssertion
from syrupy.filters import paths
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source import (
DOMAIN as MEDIA_SOURCE_DOMAIN,
URI_SCHEME,
async_browse_media,
async_resolve_media,
)
from homeassistant.components.system_bridge.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
@pytest.fixture(autouse=True)
async def setup_component(hass: HomeAssistant) -> None:
"""Set up component."""
assert await async_setup_component(
hass,
MEDIA_SOURCE_DOMAIN,
{},
)
async def test_root(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
init_integration: MockConfigEntry,
) -> None:
"""Test root media browsing."""
browse_media_root = await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}",
)
assert browse_media_root.as_dict() == snapshot(
name=f"{DOMAIN}_media_source_root",
exclude=paths("children", "media_content_id"),
)
async def test_entry(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
init_integration: MockConfigEntry,
) -> None:
"""Test browsing entry."""
browse_media_entry = await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}/{init_integration.entry_id}",
)
assert browse_media_entry.as_dict() == snapshot(
name=f"{DOMAIN}_media_source_entry",
exclude=paths("children", "media_content_id"),
)
async def test_directory(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
init_integration: MockConfigEntry,
) -> None:
"""Test browsing directory."""
browse_media_directory = await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}/{init_integration.entry_id}~~documents",
)
assert browse_media_directory.as_dict() == snapshot(
name=f"{DOMAIN}_media_source_directory",
exclude=paths("children", "media_content_id"),
)
async def test_subdirectory(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
init_integration: MockConfigEntry,
) -> None:
"""Test browsing directory."""
browse_media_directory = await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}/{init_integration.entry_id}~~documents/testsubdirectory",
)
assert browse_media_directory.as_dict() == snapshot(
name=f"{DOMAIN}_media_source_subdirectory",
exclude=paths("children", "media_content_id"),
)
async def test_file(
hass: HomeAssistant,
snapshot: SnapshotAssertion,
init_integration: MockConfigEntry,
) -> None:
"""Test browsing file."""
resolve_media_file = await async_resolve_media(
hass,
f"{URI_SCHEME}{DOMAIN}/{init_integration.entry_id}~~documents/testfile.txt~~text/plain",
None,
)
assert resolve_media_file == snapshot(
name=f"{DOMAIN}_media_source_file_text",
)
resolve_media_file = await async_resolve_media(
hass,
f"{URI_SCHEME}{DOMAIN}/{init_integration.entry_id}~~documents/testimage.jpg~~image/jpeg",
None,
)
assert resolve_media_file == snapshot(
name=f"{DOMAIN}_media_source_file_image",
)
async def test_bad_entry(
hass: HomeAssistant,
init_integration: MockConfigEntry,
) -> None:
"""Test invalid entry raises BrowseError."""
with pytest.raises(BrowseError):
await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}/badentryid",
)
with pytest.raises(BrowseError):
await async_browse_media(
hass,
f"{URI_SCHEME}{DOMAIN}/badentryid~~baddirectory",
)
with pytest.raises(ValueError):
await async_resolve_media(
hass,
f"{URI_SCHEME}{DOMAIN}/badentryid~~baddirectory/badfile.txt~~text/plain",
None,
)