Update System Bridge to support version 4.x.x and above (#107957)

* Update System Bridge to support version 4.x.x and above

Update systembridgeconnector to version 4.0.0.dev4

Update system_bridgeconnector version to 4.0.0.dev6

Refactor WebSocket client handling in config_flow.py

Update strings

Update data handling

Add default field values to SystemBridgeCoordinatorData

Add version check and issue creation for unsupported System Bridge versions

Update coordinator.py to set disks and memory to None

Update system bridge coordinator to use token instead of API key

Update systembridgeconnector version to 4.0.0.dev7

Update systembridgeconnector version to 4.0.0.dev8

Update systembridgeconnector version to 4.0.0.dev9

Changes

Update units

Fix GPU memory calculation in sensor.py

Update GPU memory unit of measurement

Add translation keys for binary sensor names

Cleanup

Add async_migrate_entry function for entry migration

Update systembridgeconnector version to 4.0.0.dev10

Update systembridgeconnector version to 4.0.0.dev11

Add version check and authentication handling

Update token description in strings.json

Fix skipping partitions without data in system_bridge sensor

Update systembridgeconnector version to 4.0.0.dev12

Update systembridgeconnector version to 4.0.0

Add check for unsupported version of System Bridge

Update systembridgeconnector version to 4.0.1

Update debug log message in async_setup_entry function

Remove debug log statement

Fixes

Update key to token

Update tests

Update tests

Remove unused import in test_config_flow.py

Remove added missing translations for another PR

Refactor CPU power per CPU calculation

Make one liner into lambda

Refactors

Fix exception type in async_setup_entry function

Move checks to class and set minor version

Remove unnecessary comment in gpu_memory_free function

Remove translation_key for memory_used_percentage sensor

Reverse string change

Update token placeholder in strings.json

Remove suggested_display_precision from sensor descriptions

Remove suggested_display_precision from GPU sensor setup

Refactor sensor code

* Update migrate entry

* Refactor GPU-related functions to use a decorator

* Move per cpu functions to use decorator

* Refactor functions to use decorators for data availability

* Remove CONF_API_KEY from config entry data

* Add test for migration

* Refactor import statement in test_config_flow.py
This commit is contained in:
Aidan Timson 2024-03-04 10:14:46 +00:00 committed by GitHub
parent faef5da1c5
commit 4c67670566
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 569 additions and 349 deletions

View File

@ -9,7 +9,7 @@ from systembridgeconnector.exceptions import (
ConnectionClosedException,
ConnectionErrorException,
)
from systembridgeconnector.version import SUPPORTED_VERSION, Version
from systembridgeconnector.version import Version
from systembridgemodels.keyboard_key import KeyboardKey
from systembridgemodels.keyboard_text import KeyboardText
from systembridgemodels.open_path import OpenPath
@ -25,6 +25,7 @@ from homeassistant.const import (
CONF_NAME,
CONF_PATH,
CONF_PORT,
CONF_TOKEN,
CONF_URL,
Platform,
)
@ -36,6 +37,7 @@ from homeassistant.helpers import (
discovery,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from .const import DOMAIN, MODULES
from .coordinator import SystemBridgeDataUpdateCoordinator
@ -80,16 +82,13 @@ async def async_setup_entry(
version = Version(
entry.data[CONF_HOST],
entry.data[CONF_PORT],
entry.data[CONF_API_KEY],
entry.data[CONF_TOKEN],
session=async_get_clientsession(hass),
)
supported = False
try:
async with asyncio.timeout(10):
if not await version.check_supported():
raise ConfigEntryNotReady(
"You are not running a supported version of System Bridge. Please"
f" update to {SUPPORTED_VERSION} or higher."
)
supported = await version.check_supported()
except AuthenticationException as exception:
_LOGGER.error("Authentication failed for %s: %s", entry.title, exception)
raise ConfigEntryAuthFailed from exception
@ -102,6 +101,21 @@ async def async_setup_entry(
f"Timed out waiting for {entry.title} ({entry.data[CONF_HOST]})."
) from exception
# If not supported, create an issue and raise ConfigEntryNotReady
if not supported:
async_create_issue(
hass=hass,
domain=DOMAIN,
issue_id=f"system_bridge_{entry.entry_id}_unsupported_version",
translation_key="unsupported_version",
translation_placeholders={"host": entry.data[CONF_HOST]},
severity=IssueSeverity.ERROR,
is_fixable=False,
)
raise ConfigEntryNotReady(
"You are not running a supported version of System Bridge. Please update to the latest version."
)
coordinator = SystemBridgeDataUpdateCoordinator(
hass,
_LOGGER,
@ -122,6 +136,7 @@ async def async_setup_entry(
f"Timed out waiting for {entry.title} ({entry.data[CONF_HOST]})."
) from exception
# Fetch initial data so we have data when entities subscribe
await coordinator.async_config_entry_first_refresh()
try:
@ -139,13 +154,6 @@ async def async_setup_entry(
f"Timed out waiting for {entry.title} ({entry.data[CONF_HOST]})."
) from exception
_LOGGER.debug(
"Initial coordinator data for %s (%s):\n%s",
entry.title,
entry.data[CONF_HOST],
coordinator.data.json(),
)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator
@ -183,7 +191,7 @@ async def async_setup_entry(
if entry.entry_id in device_entry.config_entries
)
except StopIteration as exception:
raise vol.Invalid from exception
raise vol.Invalid(f"Could not find device {device}") from exception
raise vol.Invalid(f"Device {device} does not exist")
async def handle_open_path(call: ServiceCall) -> None:
@ -328,3 +336,29 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Reload the config entry when it changed."""
await hass.config_entries.async_reload(entry.entry_id)
async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Migrate old entry."""
_LOGGER.debug("Migrating from version %s", config_entry.version)
if config_entry.version == 1 and config_entry.minor_version == 1:
# Migrate to CONF_TOKEN, which was added in 1.2
new_data = dict(config_entry.data)
new_data.setdefault(CONF_TOKEN, config_entry.data.get(CONF_API_KEY))
new_data.pop(CONF_API_KEY, None)
hass.config_entries.async_update_entry(
config_entry,
data=new_data,
minor_version=2,
)
_LOGGER.debug(
"Migration to version %s.%s successful",
config_entry.version,
config_entry.minor_version,
)
# User is trying to downgrade from a future version
return False

View File

@ -12,13 +12,12 @@ from systembridgeconnector.exceptions import (
ConnectionErrorException,
)
from systembridgeconnector.websocket_client import WebSocketClient
from systembridgemodels.get_data import GetData
from systembridgemodels.system import System
from systembridgemodels.modules import GetData, System
import voluptuous as vol
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_API_KEY, CONF_HOST, CONF_PORT
from homeassistant.const import CONF_API_KEY, CONF_HOST, CONF_PORT, CONF_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
@ -28,12 +27,12 @@ from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_AUTHENTICATE_DATA_SCHEMA = vol.Schema({vol.Required(CONF_API_KEY): cv.string})
STEP_AUTHENTICATE_DATA_SCHEMA = vol.Schema({vol.Required(CONF_TOKEN): cv.string})
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PORT, default=9170): cv.string,
vol.Required(CONF_API_KEY): cv.string,
vol.Required(CONF_TOKEN): cv.string,
}
)
@ -114,12 +113,100 @@ class SystemBridgeConfigFlow(
"""Handle a config flow for System Bridge."""
VERSION = 1
MINOR_VERSION = 2
def __init__(self) -> None:
"""Initialize flow."""
self._name: str | None = None
self._input: dict[str, Any] = {}
self._reauth = False
self._system_data: System | None = None
async def _validate_input(
self,
data: dict[str, Any],
) -> dict[str, str]:
"""Validate the user input allows us to connect.
Data has the keys from STEP_USER_DATA_SCHEMA with values provided by the user.
"""
host = data[CONF_HOST]
websocket_client = WebSocketClient(
host,
data[CONF_PORT],
data[CONF_TOKEN],
)
async def async_handle_module(
module_name: str,
module: Any,
) -> None:
"""Handle data from the WebSocket client."""
_LOGGER.debug("Set new data for: %s", module_name)
if module_name == "system":
self._system_data = module
try:
async with asyncio.timeout(15):
await websocket_client.connect(
session=async_get_clientsession(self.hass)
)
self.hass.async_create_task(
websocket_client.listen(callback=async_handle_module)
)
response = await websocket_client.get_data(GetData(modules=["system"]))
_LOGGER.debug("Got response: %s", response)
if response is None:
raise CannotConnect("No data received")
while self._system_data is None:
await asyncio.sleep(0.2)
except AuthenticationException as exception:
_LOGGER.warning(
"Authentication error when connecting to %s: %s",
data[CONF_HOST],
exception,
)
raise InvalidAuth from exception
except (
ConnectionClosedException,
ConnectionErrorException,
) as exception:
_LOGGER.warning(
"Connection error when connecting to %s: %s", data[CONF_HOST], exception
)
raise CannotConnect from exception
except TimeoutError as exception:
_LOGGER.warning(
"Timed out connecting to %s: %s", data[CONF_HOST], exception
)
raise CannotConnect from exception
except ValueError as exception:
raise CannotConnect from exception
_LOGGER.debug("Got System data: %s", self._system_data)
return {"hostname": host, "uuid": self._system_data.uuid}
async def _async_get_info(
self,
user_input: dict[str, Any],
) -> tuple[dict[str, str], dict[str, str] | None]:
errors = {}
try:
info = await self._validate_input(user_input)
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return errors, info
return errors, None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
@ -130,7 +217,7 @@ class SystemBridgeConfigFlow(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)
errors, info = await _async_get_info(self.hass, user_input)
errors, info = await self._async_get_info(user_input)
if not errors and info is not None:
# Check if already configured
await self.async_set_unique_id(info["uuid"], raise_on_progress=False)
@ -150,7 +237,7 @@ class SystemBridgeConfigFlow(
if user_input is not None:
user_input = {**self._input, **user_input}
errors, info = await _async_get_info(self.hass, user_input)
errors, info = await self._async_get_info(user_input)
if not errors and info is not None:
# Check if already configured
existing_entry = await self.async_set_unique_id(info["uuid"])

View File

@ -5,9 +5,9 @@ DOMAIN = "system_bridge"
MODULES = [
"battery",
"cpu",
"disk",
"display",
"gpu",
"disks",
"displays",
"gpus",
"media",
"memory",
"processes",

View File

@ -3,58 +3,63 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import timedelta
import logging
from typing import Any
from pydantic import BaseModel # pylint: disable=no-name-in-module
from systembridgeconnector.exceptions import (
AuthenticationException,
ConnectionClosedException,
ConnectionErrorException,
)
from systembridgeconnector.websocket_client import WebSocketClient
from systembridgemodels.battery import Battery
from systembridgemodels.cpu import Cpu
from systembridgemodels.disk import Disk
from systembridgemodels.display import Display
from systembridgemodels.get_data import GetData
from systembridgemodels.gpu import Gpu
from systembridgemodels.media import Media
from systembridgemodels.media_directories import MediaDirectories
from systembridgemodels.media_files import File as MediaFile, MediaFiles
from systembridgemodels.media_directories import MediaDirectory
from systembridgemodels.media_files import MediaFile, MediaFiles
from systembridgemodels.media_get_file import MediaGetFile
from systembridgemodels.media_get_files import MediaGetFiles
from systembridgemodels.memory import Memory
from systembridgemodels.processes import Processes
from systembridgemodels.register_data_listener import RegisterDataListener
from systembridgemodels.system import System
from systembridgemodels.modules import (
CPU,
GPU,
Battery,
Disks,
Display,
GetData,
Media,
Memory,
Process,
RegisterDataListener,
System,
)
from systembridgemodels.response import Response
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_API_KEY,
CONF_HOST,
CONF_PORT,
CONF_TOKEN,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import DOMAIN, MODULES
class SystemBridgeCoordinatorData(BaseModel):
@dataclass
class SystemBridgeCoordinatorData:
"""System Bridge Coordianator Data."""
battery: Battery = None
cpu: Cpu = None
disk: Disk = None
display: Display = None
gpu: Gpu = None
media: Media = None
battery: Battery = field(default_factory=Battery)
cpu: CPU = field(default_factory=CPU)
disks: Disks = None
displays: list[Display] = field(default_factory=list[Display])
gpus: list[GPU] = field(default_factory=list[GPU])
media: Media = field(default_factory=Media)
memory: Memory = None
processes: Processes = None
processes: list[Process] = field(default_factory=list[Process])
system: System = None
@ -78,11 +83,14 @@ class SystemBridgeDataUpdateCoordinator(
self.websocket_client = WebSocketClient(
entry.data[CONF_HOST],
entry.data[CONF_PORT],
entry.data[CONF_API_KEY],
entry.data[CONF_TOKEN],
)
super().__init__(
hass, LOGGER, name=DOMAIN, update_interval=timedelta(seconds=30)
hass,
LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=30),
)
@property
@ -99,16 +107,14 @@ class SystemBridgeDataUpdateCoordinator(
async def async_get_data(
self,
modules: list[str],
) -> None:
) -> Response:
"""Get data from WebSocket."""
if not self.websocket_client.connected:
await self._setup_websocket()
self.hass.async_create_task(
self.websocket_client.get_data(GetData(modules=modules))
)
return await self.websocket_client.get_data(GetData(modules=modules))
async def async_get_media_directories(self) -> MediaDirectories:
async def async_get_media_directories(self) -> list[MediaDirectory]:
"""Get media directories."""
return await self.websocket_client.get_directories()
@ -154,7 +160,11 @@ class SystemBridgeDataUpdateCoordinator(
await self.websocket_client.listen(callback=self.async_handle_module)
except AuthenticationException as exception:
self.last_update_success = False
self.logger.error("Authentication failed for %s: %s", self.title, exception)
self.logger.error(
"Authentication failed while listening for %s: %s",
self.title,
exception,
)
if self.unsub:
self.unsub()
self.unsub = None
@ -199,14 +209,18 @@ class SystemBridgeDataUpdateCoordinator(
await self.websocket_client.register_data_listener(
RegisterDataListener(modules=MODULES)
)
self.last_update_success = True
self.async_update_listeners()
except AuthenticationException as exception:
self.last_update_success = False
self.logger.error("Authentication failed for %s: %s", self.title, exception)
self.logger.error(
"Authentication failed at setup for %s: %s", self.title, exception
)
if self.unsub:
self.unsub()
self.unsub = None
self.last_update_success = False
self.async_update_listeners()
raise ConfigEntryAuthFailed from exception
except ConnectionErrorException as exception:
self.logger.warning(
"Connection error occurred for %s. Will retry: %s",
@ -224,9 +238,6 @@ class SystemBridgeDataUpdateCoordinator(
self.last_update_success = False
self.async_update_listeners()
self.last_update_success = True
self.async_update_listeners()
async def close_websocket(_) -> None:
"""Close WebSocket connection."""
await self.websocket_client.close()

View File

@ -10,6 +10,6 @@
"iot_class": "local_push",
"loggers": ["systembridgeconnector"],
"quality_scale": "silver",
"requirements": ["systembridgeconnector==3.10.0"],
"requirements": ["systembridgeconnector==4.0.1"],
"zeroconf": ["_system-bridge._tcp.local."]
}

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import datetime as dt
from typing import Final
from systembridgemodels.media_control import Action as MediaAction, MediaControl
from systembridgemodels.media_control import MediaAction, MediaControl
from homeassistant.components.media_player import (
MediaPlayerDeviceClass,
@ -202,7 +202,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Send play command."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.play,
action=MediaAction.PLAY.value,
)
)
@ -210,7 +210,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Send pause command."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.pause,
action=MediaAction.PAUSE.value,
)
)
@ -218,7 +218,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Send stop command."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.stop,
action=MediaAction.STOP.value,
)
)
@ -226,7 +226,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Send previous track command."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.previous,
action=MediaAction.PREVIOUS.value,
)
)
@ -234,7 +234,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Send next track command."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.next,
action=MediaAction.NEXT.value,
)
)
@ -245,7 +245,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Enable/disable shuffle mode."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.shuffle,
action=MediaAction.SHUFFLE.value,
value=shuffle,
)
)
@ -257,7 +257,7 @@ class SystemBridgeMediaPlayer(SystemBridgeEntity, MediaPlayerEntity):
"""Set repeat mode."""
await self.coordinator.websocket_client.media_control(
MediaControl(
action=MediaAction.repeat,
action=MediaAction.REPEAT.value,
value=MEDIA_SET_REPEAT_MAP.get(repeat),
)
)

View File

@ -1,8 +1,8 @@
"""System Bridge Media Source Implementation."""
from __future__ import annotations
from systembridgemodels.media_directories import MediaDirectories
from systembridgemodels.media_files import File as MediaFile, MediaFiles
from systembridgemodels.media_directories import MediaDirectory
from systembridgemodels.media_files import MediaFile, MediaFiles
from homeassistant.components.media_player import MediaClass
from homeassistant.components.media_source import MEDIA_CLASS_MAP, MEDIA_MIME_TYPES
@ -129,7 +129,7 @@ def _build_base_url(
def _build_root_paths(
entry: ConfigEntry,
media_directories: MediaDirectories,
media_directories: list[MediaDirectory],
) -> BrowseMediaSource:
"""Build base categories for System Bridge media."""
return BrowseMediaSource(
@ -152,7 +152,7 @@ def _build_root_paths(
children=[],
children_media_class=MediaClass.DIRECTORY,
)
for directory in media_directories.directories
for directory in media_directories
],
children_media_class=MediaClass.DIRECTORY,
)

View File

@ -71,6 +71,6 @@ class SystemBridgeNotificationService(BaseNotificationService):
title=kwargs.get(ATTR_TITLE, data.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)),
)
_LOGGER.debug("Sending notification: %s", notification.json())
_LOGGER.debug("Sending notification: %s", notification)
await self._coordinator.websocket_client.send_notification(notification)

View File

@ -6,6 +6,10 @@ from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Final, cast
from systembridgemodels.modules.cpu import PerCPU
from systembridgemodels.modules.displays import Display
from systembridgemodels.modules.gpus import GPU
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
@ -26,7 +30,7 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import UNDEFINED, StateType
from homeassistant.util.dt import utcnow
from homeassistant.util import dt as dt_util
from .const import DOMAIN
from .coordinator import SystemBridgeCoordinatorData, SystemBridgeDataUpdateCoordinator
@ -51,88 +55,174 @@ class SystemBridgeSensorEntityDescription(SensorEntityDescription):
def battery_time_remaining(data: SystemBridgeCoordinatorData) -> datetime | None:
"""Return the battery time remaining."""
if (value := getattr(data.battery, "sensors_secsleft", None)) is not None:
return utcnow() + timedelta(seconds=value)
return None
def cpu_power_package(data: SystemBridgeCoordinatorData) -> float | None:
"""Return the CPU package power."""
if data.cpu.power_package is not None:
return data.cpu.power_package
return None
def cpu_power_per_cpu(
data: SystemBridgeCoordinatorData,
cpu: int,
) -> float | None:
"""Return CPU power per CPU."""
if (value := getattr(data.cpu, f"power_per_cpu_{cpu}", None)) is not None:
return value
if (battery_time := data.battery.time_remaining) is not None:
return dt_util.utcnow() + timedelta(seconds=battery_time)
return None
def cpu_speed(data: SystemBridgeCoordinatorData) -> float | None:
"""Return the CPU speed."""
if data.cpu.frequency_current is not None:
return round(data.cpu.frequency_current / 1000, 2)
if (cpu_frequency := data.cpu.frequency) is not None and (
cpu_frequency.current
) is not None:
return round(cpu_frequency.current / 1000, 2)
return None
def gpu_core_clock_speed(data: SystemBridgeCoordinatorData, key: str) -> float | None:
def with_per_cpu(func) -> Callable:
"""Wrap a function to ensure per CPU data is available."""
def wrapper(data: SystemBridgeCoordinatorData, index: int) -> float | None:
"""Wrap a function to ensure per CPU data is available."""
if data.cpu.per_cpu is not None and index < len(data.cpu.per_cpu):
return func(data.cpu.per_cpu[index])
return None
return wrapper
@with_per_cpu
def cpu_power_per_cpu(per_cpu: PerCPU) -> float | None:
"""Return CPU power per CPU."""
return per_cpu.power
@with_per_cpu
def cpu_usage_per_cpu(per_cpu: PerCPU) -> float | None:
"""Return CPU usage per CPU."""
return per_cpu.usage
def with_display(func) -> Callable:
"""Wrap a function to ensure a Display is available."""
def wrapper(data: SystemBridgeCoordinatorData, index: int) -> Display | None:
"""Wrap a function to ensure a Display is available."""
if index < len(data.displays):
return func(data.displays[index])
return None
return wrapper
@with_display
def display_resolution_horizontal(display: Display) -> int | None:
"""Return the Display resolution horizontal."""
return display.resolution_horizontal
@with_display
def display_resolution_vertical(display: Display) -> int | None:
"""Return the Display resolution vertical."""
return display.resolution_vertical
@with_display
def display_refresh_rate(display: Display) -> float | None:
"""Return the Display refresh rate."""
return display.refresh_rate
def with_gpu(func) -> Callable:
"""Wrap a function to ensure a GPU is available."""
def wrapper(data: SystemBridgeCoordinatorData, index: int) -> GPU | None:
"""Wrap a function to ensure a GPU is available."""
if index < len(data.gpus):
return func(data.gpus[index])
return None
return wrapper
@with_gpu
def gpu_core_clock_speed(gpu: GPU) -> float | None:
"""Return the GPU core clock speed."""
if (value := getattr(data.gpu, f"{key}_core_clock", None)) is not None:
return round(value)
return None
return gpu.core_clock
def gpu_memory_clock_speed(data: SystemBridgeCoordinatorData, key: str) -> float | None:
@with_gpu
def gpu_fan_speed(gpu: GPU) -> float | None:
"""Return the GPU fan speed."""
return gpu.fan_speed
@with_gpu
def gpu_memory_clock_speed(gpu: GPU) -> float | None:
"""Return the GPU memory clock speed."""
if (value := getattr(data.gpu, f"{key}_memory_clock", None)) is not None:
return round(value)
return None
return gpu.memory_clock
def gpu_memory_free(data: SystemBridgeCoordinatorData, key: str) -> float | None:
@with_gpu
def gpu_memory_free(gpu: GPU) -> float | None:
"""Return the free GPU memory."""
if (value := getattr(data.gpu, f"{key}_memory_free", None)) is not None:
return round(value)
return None
return gpu.memory_free
def gpu_memory_used(data: SystemBridgeCoordinatorData, key: str) -> float | None:
@with_gpu
def gpu_memory_used(gpu: GPU) -> float | None:
"""Return the used GPU memory."""
if (value := getattr(data.gpu, f"{key}_memory_used", None)) is not None:
return round(value)
return None
return gpu.memory_used
def gpu_memory_used_percentage(
data: SystemBridgeCoordinatorData, key: str
) -> float | None:
@with_gpu
def gpu_memory_used_percentage(gpu: GPU) -> float | None:
"""Return the used GPU memory percentage."""
if ((used := getattr(data.gpu, f"{key}_memory_used", None)) is not None) and (
(total := getattr(data.gpu, f"{key}_memory_total", None)) is not None
):
return round(
used / total * 100,
2,
)
if (gpu.memory_used) is not None and (gpu.memory_total) is not None:
return round(gpu.memory_used / gpu.memory_total * 100, 2)
return None
@with_gpu
def gpu_power_usage(gpu: GPU) -> float | None:
"""Return the GPU power usage."""
return gpu.power_usage
@with_gpu
def gpu_temperature(gpu: GPU) -> float | None:
"""Return the GPU temperature."""
return gpu.temperature
@with_gpu
def gpu_usage_percentage(gpu: GPU) -> float | None:
"""Return the GPU usage percentage."""
return gpu.core_load
def memory_free(data: SystemBridgeCoordinatorData) -> float | None:
"""Return the free memory."""
if data.memory.virtual_free is not None:
return round(data.memory.virtual_free / 1000**3, 2)
if (virtual := data.memory.virtual) is not None and (
free := virtual.free
) is not None:
return round(free / 1000**3, 2)
return None
def memory_used(data: SystemBridgeCoordinatorData) -> float | None:
"""Return the used memory."""
if data.memory.virtual_used is not None:
return round(data.memory.virtual_used / 1000**3, 2)
if (virtual := data.memory.virtual) is not None and (
used := virtual.used
) is not None:
return round(used / 1000**3, 2)
return None
def partition_usage(
data: SystemBridgeCoordinatorData,
device_index: int,
partition_index: int,
) -> float | None:
"""Return the used memory."""
if (
(devices := data.disks.devices) is not None
and device_index < len(devices)
and (partitions := devices[device_index].partitions) is not None
and partition_index < len(partitions)
and (usage := partitions[partition_index].usage) is not None
):
return usage.percent
return None
@ -151,7 +241,7 @@ BASE_SENSOR_TYPES: tuple[SystemBridgeSensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
icon="mdi:chip",
value=cpu_power_package,
value=lambda data: data.cpu.power,
),
SystemBridgeSensorEntityDescription(
key="cpu_speed",
@ -197,15 +287,14 @@ BASE_SENSOR_TYPES: tuple[SystemBridgeSensorEntityDescription, ...] = (
),
SystemBridgeSensorEntityDescription(
key="memory_used_percentage",
translation_key="memory_used",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:memory",
value=lambda data: data.memory.virtual_percent,
value=lambda data: data.memory.virtual.percent,
),
SystemBridgeSensorEntityDescription(
key="memory_used",
translation_key="amount_memory_used",
translation_key="memory_used",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.GIGABYTES,
@ -224,7 +313,7 @@ BASE_SENSOR_TYPES: tuple[SystemBridgeSensorEntityDescription, ...] = (
translation_key="processes",
state_class=SensorStateClass.MEASUREMENT,
icon="mdi:counter",
value=lambda data: int(data.processes.count),
value=lambda data: len(data.processes),
),
SystemBridgeSensorEntityDescription(
key="processes_load",
@ -279,23 +368,27 @@ async def async_setup_entry(
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
)
for partition in coordinator.data.disk.partitions:
entities.append(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"filesystem_{partition.replace(':', '')}",
name=f"{partition} space used",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
value=lambda data, p=partition: getattr(
data.disk, f"usage_{p}_percent", None
for index_device, device in enumerate(coordinator.data.disks.devices):
if device.partitions is None:
continue
for index_partition, partition in enumerate(device.partitions):
entities.append(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"filesystem_{partition.mount_point.replace(':', '')}",
name=f"{partition.mount_point} space used",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:harddisk",
value=lambda data,
dk=index_device,
pk=index_partition: partition_usage(data, dk, pk),
),
),
entry.data[CONF_PORT],
entry.data[CONF_PORT],
)
)
)
if (
coordinator.data.battery
@ -307,20 +400,6 @@ async def async_setup_entry(
SystemBridgeSensor(coordinator, description, entry.data[CONF_PORT])
)
displays: list[dict[str, str]] = []
if coordinator.data.display.displays is not None:
displays.extend(
{
"key": display,
"name": getattr(coordinator.data.display, f"{display}_name").replace(
"Display ", ""
),
}
for display in coordinator.data.display.displays
if hasattr(coordinator.data.display, f"{display}_name")
)
display_count = len(displays)
entities.append(
SystemBridgeSensor(
coordinator,
@ -329,236 +408,213 @@ async def async_setup_entry(
translation_key="displays_connected",
state_class=SensorStateClass.MEASUREMENT,
icon="mdi:monitor",
value=lambda _, count=display_count: count,
value=lambda data: len(data.displays) if data.displays else None,
),
entry.data[CONF_PORT],
)
)
for _, display in enumerate(displays):
if coordinator.data.displays is not None:
for index, display in enumerate(coordinator.data.displays):
entities = [
*entities,
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display.id}_resolution_x",
name=f"Display {display.id} resolution x",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PIXELS,
icon="mdi:monitor",
value=lambda data, k=index: display_resolution_horizontal(
data, k
),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display.id}_resolution_y",
name=f"Display {display.id} resolution y",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PIXELS,
icon="mdi:monitor",
value=lambda data, k=index: display_resolution_vertical(
data, k
),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display.id}_refresh_rate",
name=f"Display {display.id} refresh rate",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.HERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:monitor",
value=lambda data, k=index: display_refresh_rate(data, k),
),
entry.data[CONF_PORT],
),
]
for index, gpu in enumerate(coordinator.data.gpus):
entities = [
*entities,
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display['name']}_resolution_x",
name=f"Display {display['name']} resolution x",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PIXELS,
icon="mdi:monitor",
value=lambda data, k=display["key"]: getattr(
data.display, f"{k}_resolution_horizontal", None
),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display['name']}_resolution_y",
name=f"Display {display['name']} resolution y",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PIXELS,
icon="mdi:monitor",
value=lambda data, k=display["key"]: getattr(
data.display, f"{k}_resolution_vertical", None
),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"display_{display['name']}_refresh_rate",
name=f"Display {display['name']} refresh rate",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.HERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:monitor",
value=lambda data, k=display["key"]: getattr(
data.display, f"{k}_refresh_rate", None
),
),
entry.data[CONF_PORT],
),
]
gpus: list[dict[str, str]] = []
if coordinator.data.gpu.gpus is not None:
gpus.extend(
{
"key": gpu,
"name": getattr(coordinator.data.gpu, f"{gpu}_name"),
}
for gpu in coordinator.data.gpu.gpus
if hasattr(coordinator.data.gpu, f"{gpu}_name")
)
for index, gpu in enumerate(gpus):
entities = [
*entities,
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_core_clock_speed",
name=f"{gpu['name']} clock speed",
key=f"gpu_{gpu.id}_core_clock_speed",
name=f"{gpu.name} clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=gpu["key"]: gpu_core_clock_speed(data, k),
value=lambda data, k=index: gpu_core_clock_speed(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_memory_clock_speed",
name=f"{gpu['name']} memory clock speed",
key=f"gpu_{gpu.id}_memory_clock_speed",
name=f"{gpu.name} memory clock speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.MEGAHERTZ,
device_class=SensorDeviceClass.FREQUENCY,
icon="mdi:speedometer",
value=lambda data, k=gpu["key"]: gpu_memory_clock_speed(data, k),
value=lambda data, k=index: gpu_memory_clock_speed(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_memory_free",
name=f"{gpu['name']} memory free",
key=f"gpu_{gpu.id}_memory_free",
name=f"{gpu.name} memory free",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.GIGABYTES,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=gpu["key"]: gpu_memory_free(data, k),
value=lambda data, k=index: gpu_memory_free(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_memory_used_percentage",
name=f"{gpu['name']} memory used %",
key=f"gpu_{gpu.id}_memory_used_percentage",
name=f"{gpu.name} memory used %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:memory",
value=lambda data, k=gpu["key"]: gpu_memory_used_percentage(
data, k
),
value=lambda data, k=index: gpu_memory_used_percentage(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_memory_used",
name=f"{gpu['name']} memory used",
key=f"gpu_{gpu.id}_memory_used",
name=f"{gpu.name} memory used",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfInformation.GIGABYTES,
native_unit_of_measurement=UnitOfInformation.MEGABYTES,
device_class=SensorDeviceClass.DATA_SIZE,
icon="mdi:memory",
value=lambda data, k=gpu["key"]: gpu_memory_used(data, k),
value=lambda data, k=index: gpu_memory_used(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_fan_speed",
name=f"{gpu['name']} fan speed",
key=f"gpu_{gpu.id}_fan_speed",
name=f"{gpu.name} fan speed",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=REVOLUTIONS_PER_MINUTE,
icon="mdi:fan",
value=lambda data, k=gpu["key"]: getattr(
data.gpu, f"{k}_fan_speed", None
),
value=lambda data, k=index: gpu_fan_speed(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_power_usage",
name=f"{gpu['name']} power usage",
key=f"gpu_{gpu.id}_power_usage",
name=f"{gpu.name} power usage",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
value=lambda data, k=gpu["key"]: getattr(
data.gpu, f"{k}_power", None
),
value=lambda data, k=index: gpu_power_usage(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_temperature",
name=f"{gpu['name']} temperature",
key=f"gpu_{gpu.id}_temperature",
name=f"{gpu.name} temperature",
entity_registry_enabled_default=False,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
value=lambda data, k=gpu["key"]: getattr(
data.gpu, f"{k}_temperature", None
),
value=lambda data, k=index: gpu_temperature(data, k),
),
entry.data[CONF_PORT],
),
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"gpu_{index}_usage_percentage",
name=f"{gpu['name']} usage %",
key=f"gpu_{gpu.id}_usage_percentage",
name=f"{gpu.name} usage %",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:percent",
value=lambda data, k=gpu["key"]: getattr(
data.gpu, f"{k}_core_load", None
),
value=lambda data, k=index: gpu_usage_percentage(data, k),
),
entry.data[CONF_PORT],
),
]
for index in range(coordinator.data.cpu.count):
entities.append(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"processes_load_cpu_{index}",
name=f"Load CPU {index}",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:percent",
value=lambda data, k=index: getattr(data.cpu, f"usage_{k}", None),
),
entry.data[CONF_PORT],
)
)
if hasattr(coordinator.data.cpu, f"power_per_cpu_{index}"):
entities.append(
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"cpu_power_core_{index}",
name=f"CPU Core {index} Power",
entity_registry_enabled_default=False,
native_unit_of_measurement=UnitOfPower.WATT,
state_class=SensorStateClass.MEASUREMENT,
suggested_display_precision=2,
icon="mdi:chip",
value=lambda data, k=index: cpu_power_per_cpu(data, k),
if coordinator.data.cpu.per_cpu is not None:
for cpu in coordinator.data.cpu.per_cpu:
entities.extend(
[
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"processes_load_cpu_{cpu.id}",
name=f"Load CPU {cpu.id}",
entity_registry_enabled_default=False,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
icon="mdi:percent",
value=lambda data, k=cpu.id: cpu_usage_per_cpu(data, k),
),
entry.data[CONF_PORT],
),
entry.data[CONF_PORT],
)
SystemBridgeSensor(
coordinator,
SystemBridgeSensorEntityDescription(
key=f"cpu_power_core_{cpu.id}",
name=f"CPU Core {cpu.id} Power",
entity_registry_enabled_default=False,
native_unit_of_measurement=UnitOfPower.WATT,
state_class=SensorStateClass.MEASUREMENT,
icon="mdi:chip",
value=lambda data, k=cpu.id: cpu_power_per_cpu(data, k),
),
entry.data[CONF_PORT],
),
]
)
async_add_entities(entities)

View File

@ -3,21 +3,22 @@
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
"unsupported_version": "Your version of System Bridge is not supported. Please upgrade to the latest version.",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"flow_title": "{name}",
"step": {
"authenticate": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
"token": "[%key:common::config_flow::data::api_token%]"
},
"description": "Please enter the API Key you set in your configuration for {name}."
"description": "Please enter the token set in your configuration for {name}."
},
"user": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]",
"host": "[%key:common::config_flow::data::host%]",
"port": "[%key:common::config_flow::data::port%]"
"port": "[%key:common::config_flow::data::port%]",
"token": "Token"
},
"description": "Please enter your connection details."
}
@ -85,6 +86,12 @@
}
}
},
"issues": {
"unsupported_version": {
"title": "System Bridge Upgrade Required",
"description": "Your version of System Bridge for host {host} is not supported.\n\nPlease upgrade to the latest version."
}
},
"services": {
"open_path": {
"name": "Open path",

View File

@ -2647,7 +2647,7 @@ switchbot-api==2.0.0
synology-srm==0.2.0
# homeassistant.components.system_bridge
systembridgeconnector==3.10.0
systembridgeconnector==4.0.1
# homeassistant.components.tailscale
tailscale==0.6.0

View File

@ -2033,7 +2033,7 @@ surepy==0.9.0
switchbot-api==2.0.0
# homeassistant.components.system_bridge
systembridgeconnector==3.10.0
systembridgeconnector==4.0.1
# homeassistant.components.tailscale
tailscale==0.6.0

View File

@ -1,5 +1,8 @@
"""Test the System Bridge config flow."""
from collections.abc import Awaitable, Callable
from dataclasses import asdict
from ipaddress import ip_address
from typing import Any
from unittest.mock import patch
from systembridgeconnector.const import TYPE_DATA_UPDATE
@ -9,13 +12,14 @@ from systembridgeconnector.exceptions import (
ConnectionErrorException,
)
from systembridgemodels.const import MODEL_SYSTEM
from systembridgemodels.modules.system import System
from systembridgemodels.response import Response
from systembridgemodels.system import LastUpdated, System
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import zeroconf
from homeassistant.components.system_bridge.config_flow import SystemBridgeConfigFlow
from homeassistant.components.system_bridge.const import DOMAIN
from homeassistant.const import CONF_API_KEY, CONF_HOST, CONF_PORT
from homeassistant.const import CONF_API_KEY, CONF_HOST, CONF_PORT, CONF_TOKEN
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
@ -23,16 +27,16 @@ from tests.common import MockConfigEntry
FIXTURE_MAC_ADDRESS = "aa:bb:cc:dd:ee:ff"
FIXTURE_UUID = "e91bf575-56f3-4c83-8f42-70ac17adcd33"
FIXTURE_AUTH_INPUT = {CONF_API_KEY: "abc-123-def-456-ghi"}
FIXTURE_AUTH_INPUT = {CONF_TOKEN: "abc-123-def-456-ghi"}
FIXTURE_USER_INPUT = {
CONF_API_KEY: "abc-123-def-456-ghi",
CONF_TOKEN: "abc-123-def-456-ghi",
CONF_HOST: "test-bridge",
CONF_PORT: "9170",
}
FIXTURE_ZEROCONF_INPUT = {
CONF_API_KEY: "abc-123-def-456-ghi",
CONF_TOKEN: "abc-123-def-456-ghi",
CONF_HOST: "1.1.1.1",
CONF_PORT: "9170",
}
@ -69,7 +73,6 @@ FIXTURE_ZEROCONF_BAD = zeroconf.ZeroconfServiceInfo(
FIXTURE_SYSTEM = System(
id=FIXTURE_UUID,
boot_time=1,
fqdn="",
hostname="1.1.1.1",
@ -82,20 +85,7 @@ FIXTURE_SYSTEM = System(
version="",
version_latest="",
version_newer_available=False,
last_updated=LastUpdated(
boot_time=1,
fqdn=1,
hostname=1,
ip_address_4=1,
mac_address=1,
platform=1,
platform_version=1,
uptime=1,
uuid=1,
version=1,
version_latest=1,
version_newer_available=1,
),
users=[],
)
FIXTURE_DATA_RESPONSE = Response(
@ -104,7 +94,7 @@ FIXTURE_DATA_RESPONSE = Response(
subtype=None,
message="Data received",
module=MODEL_SYSTEM,
data=FIXTURE_SYSTEM,
data=asdict(FIXTURE_SYSTEM),
)
FIXTURE_DATA_RESPONSE_BAD = Response(
@ -126,6 +116,17 @@ FIXTURE_DATA_RESPONSE_BAD = Response(
)
async def mock_data_listener(
self,
callback: Callable[[str, Any], Awaitable[None]] | None = None,
_: bool = False,
):
"""Mock websocket data listener."""
if callback is not None:
# Simulate data received from the websocket
await callback(MODEL_SYSTEM, FIXTURE_SYSTEM)
async def test_show_user_form(hass: HomeAssistant) -> None:
"""Test that the setup form is served."""
result = await hass.config_entries.flow.async_init(
@ -152,6 +153,7 @@ async def test_user_flow(hass: HomeAssistant) -> None:
return_value=FIXTURE_DATA_RESPONSE,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
), patch(
"homeassistant.components.system_bridge.async_setup_entry",
return_value=True,
@ -206,6 +208,7 @@ async def test_form_connection_closed_cannot_connect(hass: HomeAssistant) -> Non
side_effect=ConnectionClosedException,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
@ -233,6 +236,7 @@ async def test_form_timeout_cannot_connect(hass: HomeAssistant) -> None:
side_effect=TimeoutError,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
@ -260,6 +264,7 @@ async def test_form_invalid_auth(hass: HomeAssistant) -> None:
side_effect=AuthenticationException,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
@ -287,33 +292,7 @@ async def test_form_uuid_error(hass: HomeAssistant) -> None:
side_effect=ValueError,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
)
await hass.async_block_till_done()
assert result2["type"] == data_entry_flow.FlowResultType.FORM
assert result2["step_id"] == "user"
assert result2["errors"] == {"base": "cannot_connect"}
async def test_form_value_error(hass: HomeAssistant) -> None:
"""Test we handle error from bad value."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.FlowResultType.FORM
assert result["errors"] is None
with patch(
"homeassistant.components.system_bridge.config_flow.WebSocketClient.connect"
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.get_data",
return_value=FIXTURE_DATA_RESPONSE_BAD,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
@ -341,6 +320,7 @@ async def test_form_unknown_error(hass: HomeAssistant) -> None:
side_effect=Exception,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_USER_INPUT
@ -368,6 +348,7 @@ async def test_reauth_authorization_error(hass: HomeAssistant) -> None:
side_effect=AuthenticationException,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_AUTH_INPUT
@ -401,6 +382,24 @@ async def test_reauth_connection_error(hass: HomeAssistant) -> None:
assert result2["step_id"] == "authenticate"
assert result2["errors"] == {"base": "cannot_connect"}
with patch(
"systembridgeconnector.websocket_client.WebSocketClient.connect",
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.get_data",
return_value=None,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_AUTH_INPUT
)
await hass.async_block_till_done()
assert result3["type"] == data_entry_flow.FlowResultType.FORM
assert result3["step_id"] == "authenticate"
assert result3["errors"] == {"base": "cannot_connect"}
async def test_reauth_connection_closed_error(hass: HomeAssistant) -> None:
"""Test we show user form on connection error."""
@ -418,6 +417,7 @@ async def test_reauth_connection_closed_error(hass: HomeAssistant) -> None:
side_effect=ConnectionClosedException,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_AUTH_INPUT
@ -450,10 +450,11 @@ async def test_reauth_flow(hass: HomeAssistant) -> None:
return_value=FIXTURE_DATA_RESPONSE,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
), patch(
"homeassistant.components.system_bridge.async_setup_entry",
return_value=True,
) as mock_setup_entry:
):
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"], FIXTURE_AUTH_INPUT
)
@ -462,8 +463,6 @@ async def test_reauth_flow(hass: HomeAssistant) -> None:
assert result2["type"] == data_entry_flow.FlowResultType.ABORT
assert result2["reason"] == "reauth_successful"
assert len(mock_setup_entry.mock_calls) == 1
async def test_zeroconf_flow(hass: HomeAssistant) -> None:
"""Test zeroconf flow."""
@ -484,6 +483,7 @@ async def test_zeroconf_flow(hass: HomeAssistant) -> None:
return_value=FIXTURE_DATA_RESPONSE,
), patch(
"systembridgeconnector.websocket_client.WebSocketClient.listen",
new=mock_data_listener,
), patch(
"homeassistant.components.system_bridge.async_setup_entry",
return_value=True,
@ -536,3 +536,28 @@ async def test_zeroconf_bad_zeroconf_info(hass: HomeAssistant) -> None:
assert result["type"] == data_entry_flow.FlowResultType.ABORT
assert result["reason"] == "unknown"
async def test_migration(hass: HomeAssistant) -> None:
"""Test migration from system_bridge to system_bridge."""
config_entry = MockConfigEntry(
domain=DOMAIN,
unique_id=FIXTURE_UUID,
data={
CONF_API_KEY: FIXTURE_USER_INPUT[CONF_TOKEN],
CONF_HOST: FIXTURE_USER_INPUT[CONF_HOST],
CONF_PORT: FIXTURE_USER_INPUT[CONF_PORT],
},
version=1,
minor_version=1,
)
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Check that the version has been updated and the api_key has been moved to token
assert config_entry.version == SystemBridgeConfigFlow.VERSION
assert config_entry.minor_version == SystemBridgeConfigFlow.MINOR_VERSION
assert CONF_API_KEY not in config_entry.data
assert config_entry.data[CONF_TOKEN] == FIXTURE_USER_INPUT[CONF_TOKEN]
assert config_entry.data == FIXTURE_USER_INPUT