mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 13:17:32 +00:00
Refactor ZHA core channel initialization (#43953)
* Cleanup Basic channnel Remove unused methods. * Refactor async_configure() method Split async_configure() into async_configure() and async_configure_channel_specfici() * Refactor async_initilize() method Split into two different methods and configure channel specifics via async_configure_channel_specific() * Fixes
This commit is contained in:
parent
b19c705867
commit
125ceb7449
@ -73,13 +73,9 @@ class BinarySensor(ZhaEntity, BinarySensorEntity):
|
|||||||
self._channel = channels[0]
|
self._channel = channels[0]
|
||||||
self._device_class = self.DEVICE_CLASS
|
self._device_class = self.DEVICE_CLASS
|
||||||
|
|
||||||
async def get_device_class(self):
|
|
||||||
"""Get the HA device class from the channel."""
|
|
||||||
|
|
||||||
async def async_added_to_hass(self):
|
async def async_added_to_hass(self):
|
||||||
"""Run when about to be added to hass."""
|
"""Run when about to be added to hass."""
|
||||||
await super().async_added_to_hass()
|
await super().async_added_to_hass()
|
||||||
await self.get_device_class()
|
|
||||||
self.async_accept_signal(
|
self.async_accept_signal(
|
||||||
self._channel, SIGNAL_ATTR_UPDATED, self.async_set_state
|
self._channel, SIGNAL_ATTR_UPDATED, self.async_set_state
|
||||||
)
|
)
|
||||||
@ -168,10 +164,10 @@ class IASZone(BinarySensor):
|
|||||||
|
|
||||||
SENSOR_ATTR = "zone_status"
|
SENSOR_ATTR = "zone_status"
|
||||||
|
|
||||||
async def get_device_class(self) -> None:
|
@property
|
||||||
"""Get the HA device class from the channel."""
|
def device_class(self) -> str:
|
||||||
zone_type = await self._channel.get_attribute_value("zone_type")
|
"""Return device class from component DEVICE_CLASSES."""
|
||||||
self._device_class = CLASS_MAPPING.get(zone_type)
|
return CLASS_MAPPING.get(self._channel.cluster.get("zone_type"))
|
||||||
|
|
||||||
async def async_update(self):
|
async def async_update(self):
|
||||||
"""Attempt to retrieve on off state from the binary sensor."""
|
"""Attempt to retrieve on off state from the binary sensor."""
|
||||||
|
@ -187,29 +187,36 @@ class ZigbeeChannel(LogMixin):
|
|||||||
str(ex),
|
str(ex),
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_configure(self):
|
async def async_configure(self) -> None:
|
||||||
"""Set cluster binding and attribute reporting."""
|
"""Set cluster binding and attribute reporting."""
|
||||||
if not self._ch_pool.skip_configuration:
|
if not self._ch_pool.skip_configuration:
|
||||||
await self.bind()
|
await self.bind()
|
||||||
if self.cluster.is_server:
|
if self.cluster.is_server:
|
||||||
await self.configure_reporting()
|
await self.configure_reporting()
|
||||||
|
ch_specific_cfg = getattr(self, "async_configure_channel_specific", None)
|
||||||
|
if ch_specific_cfg:
|
||||||
|
await ch_specific_cfg()
|
||||||
self.debug("finished channel configuration")
|
self.debug("finished channel configuration")
|
||||||
else:
|
else:
|
||||||
self.debug("skipping channel configuration")
|
self.debug("skipping channel configuration")
|
||||||
self._status = ChannelStatus.CONFIGURED
|
self._status = ChannelStatus.CONFIGURED
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
async def async_initialize(self, from_cache: bool) -> None:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
if not from_cache and self._ch_pool.skip_configuration:
|
if not from_cache and self._ch_pool.skip_configuration:
|
||||||
self._status = ChannelStatus.INITIALIZED
|
self._status = ChannelStatus.INITIALIZED
|
||||||
return
|
return
|
||||||
|
|
||||||
self.debug("initializing channel: from_cache: %s", from_cache)
|
self.debug("initializing channel: from_cache: %s", from_cache)
|
||||||
attributes = []
|
attributes = [cfg["attr"] for cfg in self._report_config]
|
||||||
for report_config in self._report_config:
|
|
||||||
attributes.append(report_config["attr"])
|
|
||||||
if attributes:
|
if attributes:
|
||||||
await self.get_attributes(attributes, from_cache=from_cache)
|
await self.get_attributes(attributes, from_cache=from_cache)
|
||||||
|
|
||||||
|
ch_specific_init = getattr(self, "async_initialize_channel_specific", None)
|
||||||
|
if ch_specific_init:
|
||||||
|
await ch_specific_init(from_cache=from_cache)
|
||||||
|
|
||||||
|
self.debug("finished channel configuration")
|
||||||
self._status = ChannelStatus.INITIALIZED
|
self._status = ChannelStatus.INITIALIZED
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
|
@ -35,11 +35,6 @@ class DoorLockChannel(ZigbeeChannel):
|
|||||||
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
|
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
|
||||||
"""Initialize channel."""
|
|
||||||
await self.get_attribute_value(self._value_attribute, from_cache=from_cache)
|
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(closures.Shade.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(closures.Shade.cluster_id)
|
||||||
class Shade(ZigbeeChannel):
|
class Shade(ZigbeeChannel):
|
||||||
@ -85,8 +80,3 @@ class WindowCovering(ZigbeeChannel):
|
|||||||
self.async_send_signal(
|
self.async_send_signal(
|
||||||
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
|
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
|
||||||
"""Initialize channel."""
|
|
||||||
await self.get_attribute_value(self._value_attribute, from_cache=from_cache)
|
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
"""General channels module for Zigbee Home Automation."""
|
"""General channels module for Zigbee Home Automation."""
|
||||||
import asyncio
|
import asyncio
|
||||||
from typing import Any, List, Optional
|
from typing import Any, Coroutine, List, Optional
|
||||||
|
|
||||||
import zigpy.exceptions
|
import zigpy.exceptions
|
||||||
import zigpy.zcl.clusters.general as general
|
import zigpy.zcl.clusters.general as general
|
||||||
@ -19,7 +19,7 @@ from ..const import (
|
|||||||
SIGNAL_SET_LEVEL,
|
SIGNAL_SET_LEVEL,
|
||||||
SIGNAL_UPDATE_DEVICE,
|
SIGNAL_UPDATE_DEVICE,
|
||||||
)
|
)
|
||||||
from .base import ChannelStatus, ClientChannel, ZigbeeChannel, parse_and_log_command
|
from .base import ClientChannel, ZigbeeChannel, parse_and_log_command
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.Alarms.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.Alarms.cluster_id)
|
||||||
@ -71,21 +71,6 @@ class BasicChannel(ZigbeeChannel):
|
|||||||
6: "Emergency mains and transfer switch",
|
6: "Emergency mains and transfer switch",
|
||||||
}
|
}
|
||||||
|
|
||||||
async def async_configure(self):
|
|
||||||
"""Configure this channel."""
|
|
||||||
await super().async_configure()
|
|
||||||
await self.async_initialize(False)
|
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
|
||||||
"""Initialize channel."""
|
|
||||||
if not self._ch_pool.skip_configuration or from_cache:
|
|
||||||
await self.get_attribute_value("power_source", from_cache=from_cache)
|
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
def get_power_source(self):
|
|
||||||
"""Get the power source."""
|
|
||||||
return self.cluster.get("power_source")
|
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.BinaryInput.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.BinaryInput.cluster_id)
|
||||||
class BinaryInput(ZigbeeChannel):
|
class BinaryInput(ZigbeeChannel):
|
||||||
@ -189,11 +174,6 @@ class LevelControlChannel(ZigbeeChannel):
|
|||||||
"""Dispatch level change."""
|
"""Dispatch level change."""
|
||||||
self.async_send_signal(f"{self.unique_id}_{command}", level)
|
self.async_send_signal(f"{self.unique_id}_{command}", level)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
|
||||||
"""Initialize channel."""
|
|
||||||
await self.get_attribute_value(self.CURRENT_LEVEL, from_cache=from_cache)
|
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.MultistateInput.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.MultistateInput.cluster_id)
|
||||||
class MultistateInput(ZigbeeChannel):
|
class MultistateInput(ZigbeeChannel):
|
||||||
@ -284,12 +264,9 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
)
|
)
|
||||||
self._state = bool(value)
|
self._state = bool(value)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
async def async_initialize_channel_specific(self, from_cache: bool) -> None:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
await super().async_initialize(from_cache)
|
self._state = self.on_off
|
||||||
state = await self.get_attribute_value(self.ON_OFF, from_cache=True)
|
|
||||||
if state is not None:
|
|
||||||
self._state = bool(state)
|
|
||||||
|
|
||||||
async def async_update(self):
|
async def async_update(self):
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
@ -338,7 +315,7 @@ class PollControl(ZigbeeChannel):
|
|||||||
CHECKIN_FAST_POLL_TIMEOUT = 2 * 4 # 2s
|
CHECKIN_FAST_POLL_TIMEOUT = 2 * 4 # 2s
|
||||||
LONG_POLL = 6 * 4 # 6s
|
LONG_POLL = 6 * 4 # 6s
|
||||||
|
|
||||||
async def async_configure(self) -> None:
|
async def async_configure_channel_specific(self) -> None:
|
||||||
"""Configure channel: set check-in interval."""
|
"""Configure channel: set check-in interval."""
|
||||||
try:
|
try:
|
||||||
res = await self.cluster.write_attributes(
|
res = await self.cluster.write_attributes(
|
||||||
@ -347,7 +324,6 @@ class PollControl(ZigbeeChannel):
|
|||||||
self.debug("%ss check-in interval set: %s", self.CHECKIN_INTERVAL / 4, res)
|
self.debug("%ss check-in interval set: %s", self.CHECKIN_INTERVAL / 4, res)
|
||||||
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
|
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
|
||||||
self.debug("Couldn't set check-in interval: %s", ex)
|
self.debug("Couldn't set check-in interval: %s", ex)
|
||||||
await super().async_configure()
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def cluster_command(
|
def cluster_command(
|
||||||
@ -375,16 +351,13 @@ class PowerConfigurationChannel(ZigbeeChannel):
|
|||||||
{"attr": "battery_percentage_remaining", "config": REPORT_CONFIG_BATTERY_SAVE},
|
{"attr": "battery_percentage_remaining", "config": REPORT_CONFIG_BATTERY_SAVE},
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||||
"""Initialize channel."""
|
"""Initialize channel specific attrs."""
|
||||||
attributes = [
|
attributes = [
|
||||||
"battery_size",
|
"battery_size",
|
||||||
"battery_percentage_remaining",
|
|
||||||
"battery_voltage",
|
|
||||||
"battery_quantity",
|
"battery_quantity",
|
||||||
]
|
]
|
||||||
await self.get_attributes(attributes, from_cache=from_cache)
|
return self.get_attributes(attributes, from_cache=from_cache)
|
||||||
self._status = ChannelStatus.INITIALIZED
|
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.PowerProfile.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.PowerProfile.cluster_id)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
"""Home automation channels module for Zigbee Home Automation."""
|
"""Home automation channels module for Zigbee Home Automation."""
|
||||||
from typing import Optional
|
from typing import Coroutine, Optional
|
||||||
|
|
||||||
import zigpy.zcl.clusters.homeautomation as homeautomation
|
import zigpy.zcl.clusters.homeautomation as homeautomation
|
||||||
|
|
||||||
@ -62,23 +62,17 @@ class ElectricalMeasurementChannel(ZigbeeChannel):
|
|||||||
result,
|
result,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||||
"""Initialize channel."""
|
"""Initialize channel specific attributes."""
|
||||||
await self.fetch_config(True)
|
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
async def fetch_config(self, from_cache):
|
return self.get_attributes(
|
||||||
"""Fetch config from device and updates format specifier."""
|
|
||||||
|
|
||||||
# prime the cache
|
|
||||||
await self.get_attributes(
|
|
||||||
[
|
[
|
||||||
"ac_power_divisor",
|
"ac_power_divisor",
|
||||||
"power_divisor",
|
"power_divisor",
|
||||||
"ac_power_multiplier",
|
"ac_power_multiplier",
|
||||||
"power_multiplier",
|
"power_multiplier",
|
||||||
],
|
],
|
||||||
from_cache=from_cache,
|
from_cache=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -362,7 +362,7 @@ class ThermostatChannel(ZigbeeChannel):
|
|||||||
)
|
)
|
||||||
|
|
||||||
@retryable_req(delays=(1, 1, 3))
|
@retryable_req(delays=(1, 1, 3))
|
||||||
async def async_initialize(self, from_cache):
|
async def async_initialize_channel_specific(self, from_cache: bool) -> None:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
|
|
||||||
cached = [a for a, cached in self._init_attrs.items() if cached]
|
cached = [a for a, cached in self._init_attrs.items() if cached]
|
||||||
@ -370,7 +370,6 @@ class ThermostatChannel(ZigbeeChannel):
|
|||||||
|
|
||||||
await self._chunk_attr_read(cached, cached=True)
|
await self._chunk_attr_read(cached, cached=True)
|
||||||
await self._chunk_attr_read(uncached, cached=False)
|
await self._chunk_attr_read(uncached, cached=False)
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
async def async_set_operation_mode(self, mode) -> bool:
|
async def async_set_operation_mode(self, mode) -> bool:
|
||||||
"""Set Operation mode."""
|
"""Set Operation mode."""
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
"""Lighting channels module for Zigbee Home Automation."""
|
"""Lighting channels module for Zigbee Home Automation."""
|
||||||
from typing import Optional
|
from typing import Coroutine, Optional
|
||||||
|
|
||||||
import zigpy.zcl.clusters.lighting as lighting
|
import zigpy.zcl.clusters.lighting as lighting
|
||||||
|
|
||||||
@ -75,15 +75,13 @@ class ColorChannel(ZigbeeChannel):
|
|||||||
"""Return the warmest color_temp that this channel supports."""
|
"""Return the warmest color_temp that this channel supports."""
|
||||||
return self.cluster.get("color_temp_physical_max", self.MAX_MIREDS)
|
return self.cluster.get("color_temp_physical_max", self.MAX_MIREDS)
|
||||||
|
|
||||||
async def async_configure(self) -> None:
|
def async_configure_channel_specific(self) -> Coroutine:
|
||||||
"""Configure channel."""
|
"""Configure channel."""
|
||||||
await self.fetch_color_capabilities(False)
|
return self.fetch_color_capabilities(False)
|
||||||
await super().async_configure()
|
|
||||||
|
|
||||||
async def async_initialize(self, from_cache: bool) -> None:
|
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
await self.fetch_color_capabilities(True)
|
return self.fetch_color_capabilities(True)
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
async def fetch_color_capabilities(self, from_cache: bool) -> None:
|
async def fetch_color_capabilities(self, from_cache: bool) -> None:
|
||||||
"""Get the color configuration."""
|
"""Get the color configuration."""
|
||||||
|
@ -5,6 +5,7 @@ For more details about this component, please refer to the documentation at
|
|||||||
https://home-assistant.io/integrations/zha/
|
https://home-assistant.io/integrations/zha/
|
||||||
"""
|
"""
|
||||||
import asyncio
|
import asyncio
|
||||||
|
from typing import Coroutine
|
||||||
|
|
||||||
from zigpy.exceptions import ZigbeeException
|
from zigpy.exceptions import ZigbeeException
|
||||||
import zigpy.zcl.clusters.security as security
|
import zigpy.zcl.clusters.security as security
|
||||||
@ -20,7 +21,7 @@ from ..const import (
|
|||||||
WARNING_DEVICE_STROBE_HIGH,
|
WARNING_DEVICE_STROBE_HIGH,
|
||||||
WARNING_DEVICE_STROBE_YES,
|
WARNING_DEVICE_STROBE_YES,
|
||||||
)
|
)
|
||||||
from .base import ZigbeeChannel
|
from .base import ChannelStatus, ZigbeeChannel
|
||||||
|
|
||||||
|
|
||||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(security.IasAce.cluster_id)
|
@registries.ZIGBEE_CHANNEL_REGISTRY.register(security.IasAce.cluster_id)
|
||||||
@ -155,14 +156,10 @@ class IASZoneChannel(ZigbeeChannel):
|
|||||||
str(ex),
|
str(ex),
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
|
||||||
self.debug("Sending pro-active IAS enroll response")
|
self.debug("Sending pro-active IAS enroll response")
|
||||||
await self._cluster.enroll_response(0, 0)
|
self._cluster.create_catching_task(self._cluster.enroll_response(0, 0))
|
||||||
except ZigbeeException as ex:
|
|
||||||
self.debug(
|
self._status = ChannelStatus.CONFIGURED
|
||||||
"Failed to send pro-active IAS enroll response: %s",
|
|
||||||
str(ex),
|
|
||||||
)
|
|
||||||
self.debug("finished IASZoneChannel configuration")
|
self.debug("finished IASZoneChannel configuration")
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
@ -177,8 +174,7 @@ class IASZoneChannel(ZigbeeChannel):
|
|||||||
value,
|
value,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def async_initialize(self, from_cache):
|
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
attributes = ["zone_status", "zone_state"]
|
attributes = ["zone_status", "zone_state", "zone_type"]
|
||||||
await self.get_attributes(attributes, from_cache=from_cache)
|
return self.get_attributes(attributes, from_cache=from_cache)
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
"""Smart energy channels module for Zigbee Home Automation."""
|
"""Smart energy channels module for Zigbee Home Automation."""
|
||||||
from typing import Union
|
from typing import Coroutine, Union
|
||||||
|
|
||||||
import zigpy.zcl.clusters.smartenergy as smartenergy
|
import zigpy.zcl.clusters.smartenergy as smartenergy
|
||||||
|
|
||||||
@ -96,15 +96,13 @@ class Metering(ZigbeeChannel):
|
|||||||
"""Return multiplier for the value."""
|
"""Return multiplier for the value."""
|
||||||
return self.cluster.get("multiplier")
|
return self.cluster.get("multiplier")
|
||||||
|
|
||||||
async def async_configure(self) -> None:
|
def async_configure_channel_specific(self) -> Coroutine:
|
||||||
"""Configure channel."""
|
"""Configure channel."""
|
||||||
await self.fetch_config(False)
|
return self.fetch_config(False)
|
||||||
await super().async_configure()
|
|
||||||
|
|
||||||
async def async_initialize(self, from_cache: bool) -> None:
|
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
await self.fetch_config(True)
|
return self.fetch_config(True)
|
||||||
await super().async_initialize(from_cache)
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def attribute_updated(self, attrid: int, value: int) -> None:
|
def attribute_updated(self, attrid: int, value: int) -> None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user