mirror of
https://github.com/home-assistant/core.git
synced 2025-04-27 10:47:51 +00:00

* Support DPTComplex objects and validate sensor types * Gracefully start and stop xknx device objects * Use non-awaitable XknxDevice callbacks * Use non-awaitable xknx.TelegramQueue callbacks * Use non-awaitable xknx.ConnectionManager callbacks * Remove unnecessary `hass.async_block_till_done()` calls * Wait for StateUpdater logic to proceed when receiving responses * Update import module paths for specific DPTs * Support Enum data types * New HVAC mode names * HVAC Enums instead of Enum member value strings * New date and time devices * Update xknx to 3.0.0 * Fix expose tests and DPTEnumData check * ruff and mypy fixes
54 lines
1.9 KiB
Python
54 lines
1.9 KiB
Python
"""Handle KNX Devices."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from xknx import XKNX
|
|
from xknx.core import XknxConnectionState
|
|
from xknx.io.gateway_scanner import GatewayDescriptor
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import device_registry as dr
|
|
from homeassistant.helpers.device_registry import DeviceInfo
|
|
|
|
from .const import DOMAIN
|
|
|
|
|
|
class KNXInterfaceDevice:
|
|
"""Class for KNX Interface Device handling."""
|
|
|
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry, xknx: XKNX) -> None:
|
|
"""Initialize interface device class."""
|
|
self.hass = hass
|
|
self.device_registry = dr.async_get(hass)
|
|
self.gateway_descriptor: GatewayDescriptor | None = None
|
|
self.xknx = xknx
|
|
|
|
_device_id = (DOMAIN, f"_{entry.entry_id}_interface")
|
|
self.device = self.device_registry.async_get_or_create(
|
|
config_entry_id=entry.entry_id,
|
|
name="KNX Interface",
|
|
identifiers={_device_id},
|
|
)
|
|
self.device_info = DeviceInfo(identifiers={_device_id})
|
|
|
|
self.xknx.connection_manager.register_connection_state_changed_cb(
|
|
self.connection_state_changed_cb
|
|
)
|
|
|
|
async def update(self) -> None:
|
|
"""Update interface properties on new connection."""
|
|
self.gateway_descriptor = await self.xknx.knxip_interface.gateway_info()
|
|
|
|
self.device_registry.async_update_device(
|
|
device_id=self.device.id,
|
|
model=str(self.gateway_descriptor.name)
|
|
if self.gateway_descriptor
|
|
else None,
|
|
)
|
|
|
|
def connection_state_changed_cb(self, state: XknxConnectionState) -> None:
|
|
"""Call invoked after a KNX connection state change was received."""
|
|
if state is XknxConnectionState.CONNECTED:
|
|
self.hass.async_create_task(self.update())
|