mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 04:37:06 +00:00
Refactor ZHA Zigbee Cluster report configuration (#25589)
* Move ZCL report configs to ZHA core channels. * Refactor ZCL report configuratopm and cluster binding. * Tests for ZHA channel configuration. * Update tests. * Remove INPUT_BIND_ONLY_CLUSTER ZHA core registry. We always need bind a cluster, but not always need to configure attribute reporting. No reporting is done on ZCL "client" clusters. * Lint * Black
This commit is contained in:
parent
03aec33f9e
commit
f7cfe908f7
@ -18,17 +18,12 @@ from ..const import (
|
||||
CHANNEL_ATTRIBUTE,
|
||||
CHANNEL_EVENT_RELAY,
|
||||
CHANNEL_ZDO,
|
||||
REPORT_CONFIG_DEFAULT,
|
||||
REPORT_CONFIG_MAX_INT,
|
||||
REPORT_CONFIG_MIN_INT,
|
||||
REPORT_CONFIG_RPT_CHANGE,
|
||||
SIGNAL_ATTR_UPDATED,
|
||||
)
|
||||
from ..helpers import (
|
||||
LogMixin,
|
||||
bind_cluster,
|
||||
configure_reporting,
|
||||
construct_unique_id,
|
||||
get_attr_id_by_name,
|
||||
safe_read,
|
||||
)
|
||||
from ..helpers import LogMixin, construct_unique_id, get_attr_id_by_name, safe_read
|
||||
from ..registries import CLUSTER_REPORT_CONFIGS
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -84,6 +79,7 @@ class ZigbeeChannel(LogMixin):
|
||||
"""Base channel for a Zigbee cluster."""
|
||||
|
||||
CHANNEL_NAME = None
|
||||
REPORT_CONFIG = ()
|
||||
|
||||
def __init__(self, cluster, device):
|
||||
"""Initialize ZigbeeChannel."""
|
||||
@ -95,7 +91,7 @@ class ZigbeeChannel(LogMixin):
|
||||
self._zha_device = device
|
||||
self._unique_id = construct_unique_id(cluster)
|
||||
self._report_config = CLUSTER_REPORT_CONFIGS.get(
|
||||
self._cluster.cluster_id, [{"attr": 0, "config": REPORT_CONFIG_DEFAULT}]
|
||||
self._cluster.cluster_id, self.REPORT_CONFIG
|
||||
)
|
||||
self._status = ChannelStatus.CREATED
|
||||
self._cluster.add_listener(self)
|
||||
@ -134,29 +130,75 @@ class ZigbeeChannel(LogMixin):
|
||||
"""Set the reporting configuration."""
|
||||
self._report_config = report_config
|
||||
|
||||
async def bind(self):
|
||||
"""Bind a zigbee cluster.
|
||||
|
||||
This also swallows DeliveryError exceptions that are thrown when
|
||||
devices are unreachable.
|
||||
"""
|
||||
from zigpy.exceptions import DeliveryError
|
||||
|
||||
try:
|
||||
res = await self.cluster.bind()
|
||||
self.debug("bound '%s' cluster: %s", self.cluster.ep_attribute, res[0])
|
||||
except (DeliveryError, Timeout) as ex:
|
||||
self.debug(
|
||||
"Failed to bind '%s' cluster: %s", self.cluster.ep_attribute, str(ex)
|
||||
)
|
||||
|
||||
async def configure_reporting(
|
||||
self,
|
||||
attr,
|
||||
report_config=(
|
||||
REPORT_CONFIG_MIN_INT,
|
||||
REPORT_CONFIG_MAX_INT,
|
||||
REPORT_CONFIG_RPT_CHANGE,
|
||||
),
|
||||
):
|
||||
"""Configure attribute reporting for a cluster.
|
||||
|
||||
This also swallows DeliveryError exceptions that are thrown when
|
||||
devices are unreachable.
|
||||
"""
|
||||
from zigpy.exceptions import DeliveryError
|
||||
|
||||
attr_name = self.cluster.attributes.get(attr, [attr])[0]
|
||||
|
||||
kwargs = {}
|
||||
if self.cluster.cluster_id >= 0xFC00 and self.device.manufacturer_code:
|
||||
kwargs["manufacturer"] = self.device.manufacturer_code
|
||||
|
||||
min_report_int, max_report_int, reportable_change = report_config
|
||||
try:
|
||||
res = await self.cluster.configure_reporting(
|
||||
attr, min_report_int, max_report_int, reportable_change, **kwargs
|
||||
)
|
||||
self.debug(
|
||||
"reporting '%s' attr on '%s' cluster: %d/%d/%d: Result: '%s'",
|
||||
attr_name,
|
||||
self.cluster.ep_attribute,
|
||||
min_report_int,
|
||||
max_report_int,
|
||||
reportable_change,
|
||||
res,
|
||||
)
|
||||
except (DeliveryError, Timeout) as ex:
|
||||
self.debug(
|
||||
"failed to set reporting for '%s' attr on '%s' cluster: %s",
|
||||
attr_name,
|
||||
self.cluster.ep_attribute,
|
||||
str(ex),
|
||||
)
|
||||
|
||||
async def async_configure(self):
|
||||
"""Set cluster binding and attribute reporting."""
|
||||
manufacturer = None
|
||||
manufacturer_code = self._zha_device.manufacturer_code
|
||||
# Xiaomi devices don't need this and it disrupts pairing
|
||||
if self._zha_device.manufacturer != "LUMI":
|
||||
if self.cluster.cluster_id >= 0xFC00 and manufacturer_code:
|
||||
manufacturer = manufacturer_code
|
||||
await bind_cluster(self._unique_id, self.cluster)
|
||||
if not self.cluster.bind_only:
|
||||
await self.bind()
|
||||
if self.cluster.cluster_id not in self.cluster.endpoint.out_clusters:
|
||||
for report_config in self._report_config:
|
||||
attr = report_config.get("attr")
|
||||
min_report_interval, max_report_interval, change = report_config.get(
|
||||
"config"
|
||||
)
|
||||
await configure_reporting(
|
||||
self._unique_id,
|
||||
self.cluster,
|
||||
attr,
|
||||
min_report=min_report_interval,
|
||||
max_report=max_report_interval,
|
||||
reportable_change=change,
|
||||
manufacturer=manufacturer,
|
||||
await self.configure_reporting(
|
||||
report_config["attr"], report_config["config"]
|
||||
)
|
||||
await asyncio.sleep(uniform(0.1, 0.5))
|
||||
|
||||
|
@ -10,7 +10,7 @@ from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from . import ZigbeeChannel
|
||||
from ..const import SIGNAL_ATTR_UPDATED
|
||||
from ..const import REPORT_CONFIG_IMMEDIATE, SIGNAL_ATTR_UPDATED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -19,6 +19,7 @@ class DoorLockChannel(ZigbeeChannel):
|
||||
"""Door lock channel."""
|
||||
|
||||
_value_attribute = 0
|
||||
REPORT_CONFIG = ({"attr": "lock_state", "config": REPORT_CONFIG_IMMEDIATE},)
|
||||
|
||||
async def async_update(self):
|
||||
"""Retrieve latest state."""
|
||||
|
@ -11,7 +11,14 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
from homeassistant.helpers.event import async_call_later
|
||||
|
||||
from . import ZigbeeChannel, parse_and_log_command
|
||||
from ..const import SIGNAL_ATTR_UPDATED, SIGNAL_MOVE_LEVEL, SIGNAL_SET_LEVEL
|
||||
from ..const import (
|
||||
SIGNAL_ATTR_UPDATED,
|
||||
SIGNAL_MOVE_LEVEL,
|
||||
SIGNAL_SET_LEVEL,
|
||||
REPORT_CONFIG_ASAP,
|
||||
REPORT_CONFIG_BATTERY_SAVE,
|
||||
REPORT_CONFIG_IMMEDIATE,
|
||||
)
|
||||
from ..helpers import get_attr_id_by_name
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -21,6 +28,7 @@ class OnOffChannel(ZigbeeChannel):
|
||||
"""Channel for the OnOff Zigbee cluster."""
|
||||
|
||||
ON_OFF = 0
|
||||
REPORT_CONFIG = ({"attr": "on_off", "config": REPORT_CONFIG_IMMEDIATE},)
|
||||
|
||||
def __init__(self, cluster, device):
|
||||
"""Initialize OnOffChannel."""
|
||||
@ -93,6 +101,7 @@ class LevelControlChannel(ZigbeeChannel):
|
||||
"""Channel for the LevelControl Zigbee cluster."""
|
||||
|
||||
CURRENT_LEVEL = 0
|
||||
REPORT_CONFIG = ({"attr": "current_level", "config": REPORT_CONFIG_ASAP},)
|
||||
|
||||
@callback
|
||||
def cluster_command(self, tsn, command_id, args):
|
||||
@ -173,6 +182,11 @@ class BasicChannel(ZigbeeChannel):
|
||||
class PowerConfigurationChannel(ZigbeeChannel):
|
||||
"""Channel for the zigbee power configuration cluster."""
|
||||
|
||||
REPORT_CONFIG = (
|
||||
{"attr": "battery_voltage", "config": REPORT_CONFIG_BATTERY_SAVE},
|
||||
{"attr": "battery_percentage_remaining", "config": REPORT_CONFIG_BATTERY_SAVE},
|
||||
)
|
||||
|
||||
@callback
|
||||
def attribute_updated(self, attrid, value):
|
||||
"""Handle attribute updates on this cluster."""
|
||||
|
@ -9,7 +9,11 @@ import logging
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from . import AttributeListeningChannel
|
||||
from ..const import CHANNEL_ELECTRICAL_MEASUREMENT, SIGNAL_ATTR_UPDATED
|
||||
from ..const import (
|
||||
CHANNEL_ELECTRICAL_MEASUREMENT,
|
||||
REPORT_CONFIG_DEFAULT,
|
||||
SIGNAL_ATTR_UPDATED,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -18,6 +22,7 @@ class ElectricalMeasurementChannel(AttributeListeningChannel):
|
||||
"""Channel that polls active power level."""
|
||||
|
||||
CHANNEL_NAME = CHANNEL_ELECTRICAL_MEASUREMENT
|
||||
REPORT_CONFIG = ({"attr": "active_power", "config": REPORT_CONFIG_DEFAULT},)
|
||||
|
||||
async def async_update(self):
|
||||
"""Retrieve latest state."""
|
||||
|
@ -10,7 +10,7 @@ from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from . import ZigbeeChannel
|
||||
from ..const import SIGNAL_ATTR_UPDATED
|
||||
from ..const import REPORT_CONFIG_OP, SIGNAL_ATTR_UPDATED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -19,6 +19,7 @@ class FanChannel(ZigbeeChannel):
|
||||
"""Fan channel."""
|
||||
|
||||
_value_attribute = 0
|
||||
REPORT_CONFIG = ({"attr": "fan_mode", "config": REPORT_CONFIG_OP},)
|
||||
|
||||
async def async_set_speed(self, value) -> None:
|
||||
"""Set the speed of the fan."""
|
||||
|
@ -7,6 +7,7 @@ https://home-assistant.io/components/zha/
|
||||
import logging
|
||||
|
||||
from . import ZigbeeChannel
|
||||
from ..const import REPORT_CONFIG_DEFAULT
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -17,6 +18,11 @@ class ColorChannel(ZigbeeChannel):
|
||||
CAPABILITIES_COLOR_XY = 0x08
|
||||
CAPABILITIES_COLOR_TEMP = 0x10
|
||||
UNSUPPORTED_ATTRIBUTE = 0x86
|
||||
REPORT_CONFIG = (
|
||||
{"attr": "current_x", "config": REPORT_CONFIG_DEFAULT},
|
||||
{"attr": "current_y", "config": REPORT_CONFIG_DEFAULT},
|
||||
{"attr": "color_temperature", "config": REPORT_CONFIG_DEFAULT},
|
||||
)
|
||||
|
||||
def __init__(self, cluster, device):
|
||||
"""Initialize ColorChannel."""
|
||||
|
@ -11,7 +11,6 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from . import ZigbeeChannel
|
||||
from ..const import SIGNAL_ATTR_UPDATED
|
||||
from ..helpers import bind_cluster
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -39,13 +38,14 @@ class IASZoneChannel(ZigbeeChannel):
|
||||
"""Configure IAS device."""
|
||||
# Xiaomi devices don't need this and it disrupts pairing
|
||||
if self._zha_device.manufacturer == "LUMI":
|
||||
self.debug("%s: finished IASZoneChannel configuration")
|
||||
return
|
||||
from zigpy.exceptions import DeliveryError
|
||||
|
||||
self.debug("started IASZoneChannel configuration")
|
||||
|
||||
await bind_cluster(self.unique_id, self._cluster)
|
||||
ieee = self._cluster.endpoint.device.application.ieee
|
||||
await self.bind()
|
||||
ieee = self.cluster.endpoint.device.application.ieee
|
||||
|
||||
try:
|
||||
res = await self._cluster.write_attributes({"cie_addr": ieee})
|
||||
|
@ -65,7 +65,7 @@ from .const import (
|
||||
from .device import DeviceStatus, ZHADevice
|
||||
from .discovery import async_dispatch_discovery_info, async_process_endpoint
|
||||
from .patches import apply_application_controller_patch
|
||||
from .registries import INPUT_BIND_ONLY_CLUSTERS, RADIO_TYPES
|
||||
from .registries import RADIO_TYPES
|
||||
from .store import async_get_registry
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -342,14 +342,6 @@ class ZHAGateway:
|
||||
zha_device,
|
||||
is_new_join,
|
||||
)
|
||||
if endpoint_id != 0:
|
||||
for cluster in endpoint.in_clusters.values():
|
||||
cluster.bind_only = (
|
||||
cluster.cluster_id in INPUT_BIND_ONLY_CLUSTERS
|
||||
)
|
||||
for cluster in endpoint.out_clusters.values():
|
||||
# output clusters are always bind only
|
||||
cluster.bind_only = True
|
||||
else:
|
||||
is_rejoin = is_new_join is True
|
||||
_LOGGER.debug(
|
||||
|
@ -6,20 +6,11 @@ https://home-assistant.io/components/zha/
|
||||
"""
|
||||
import asyncio
|
||||
import collections
|
||||
from concurrent.futures import TimeoutError as Timeout
|
||||
import logging
|
||||
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .const import (
|
||||
CLUSTER_TYPE_IN,
|
||||
CLUSTER_TYPE_OUT,
|
||||
DEFAULT_BAUDRATE,
|
||||
REPORT_CONFIG_MAX_INT,
|
||||
REPORT_CONFIG_MIN_INT,
|
||||
REPORT_CONFIG_RPT_CHANGE,
|
||||
RadioType,
|
||||
)
|
||||
from .const import CLUSTER_TYPE_IN, CLUSTER_TYPE_OUT, DEFAULT_BAUDRATE, RadioType
|
||||
from .registries import BINDABLE_CLUSTERS
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -48,104 +39,6 @@ async def safe_read(
|
||||
return {}
|
||||
|
||||
|
||||
async def bind_cluster(entity_id, cluster):
|
||||
"""Bind a zigbee cluster.
|
||||
|
||||
This also swallows DeliveryError exceptions that are thrown when devices
|
||||
are unreachable.
|
||||
"""
|
||||
from zigpy.exceptions import DeliveryError
|
||||
|
||||
cluster_name = cluster.ep_attribute
|
||||
try:
|
||||
res = await cluster.bind()
|
||||
_LOGGER.debug("%s: bound '%s' cluster: %s", entity_id, cluster_name, res[0])
|
||||
except (DeliveryError, Timeout) as ex:
|
||||
_LOGGER.debug(
|
||||
"%s: Failed to bind '%s' cluster: %s", entity_id, cluster_name, str(ex)
|
||||
)
|
||||
|
||||
|
||||
async def configure_reporting(
|
||||
entity_id,
|
||||
cluster,
|
||||
attr,
|
||||
min_report=REPORT_CONFIG_MIN_INT,
|
||||
max_report=REPORT_CONFIG_MAX_INT,
|
||||
reportable_change=REPORT_CONFIG_RPT_CHANGE,
|
||||
manufacturer=None,
|
||||
):
|
||||
"""Configure attribute reporting for a cluster.
|
||||
|
||||
This also swallows DeliveryError exceptions that are thrown when devices
|
||||
are unreachable.
|
||||
"""
|
||||
from zigpy.exceptions import DeliveryError
|
||||
|
||||
attr_name = cluster.attributes.get(attr, [attr])[0]
|
||||
|
||||
if isinstance(attr, str):
|
||||
attr_id = get_attr_id_by_name(cluster, attr_name)
|
||||
else:
|
||||
attr_id = attr
|
||||
|
||||
cluster_name = cluster.ep_attribute
|
||||
kwargs = {}
|
||||
if manufacturer:
|
||||
kwargs["manufacturer"] = manufacturer
|
||||
try:
|
||||
res = await cluster.configure_reporting(
|
||||
attr_id, min_report, max_report, reportable_change, **kwargs
|
||||
)
|
||||
_LOGGER.debug(
|
||||
"%s: reporting '%s' attr on '%s' cluster: %d/%d/%d: Result: '%s'",
|
||||
entity_id,
|
||||
attr_name,
|
||||
cluster_name,
|
||||
min_report,
|
||||
max_report,
|
||||
reportable_change,
|
||||
res,
|
||||
)
|
||||
except (DeliveryError, Timeout) as ex:
|
||||
_LOGGER.debug(
|
||||
"%s: failed to set reporting for '%s' attr on '%s' cluster: %s",
|
||||
entity_id,
|
||||
attr_name,
|
||||
cluster_name,
|
||||
str(ex),
|
||||
)
|
||||
|
||||
|
||||
async def bind_configure_reporting(
|
||||
entity_id,
|
||||
cluster,
|
||||
attr,
|
||||
skip_bind=False,
|
||||
min_report=REPORT_CONFIG_MIN_INT,
|
||||
max_report=REPORT_CONFIG_MAX_INT,
|
||||
reportable_change=REPORT_CONFIG_RPT_CHANGE,
|
||||
manufacturer=None,
|
||||
):
|
||||
"""Bind and configure zigbee attribute reporting for a cluster.
|
||||
|
||||
This also swallows DeliveryError exceptions that are thrown when devices
|
||||
are unreachable.
|
||||
"""
|
||||
if not skip_bind:
|
||||
await bind_cluster(entity_id, cluster)
|
||||
|
||||
await configure_reporting(
|
||||
entity_id,
|
||||
cluster,
|
||||
attr,
|
||||
min_report=min_report,
|
||||
max_report=max_report,
|
||||
reportable_change=reportable_change,
|
||||
manufacturer=manufacturer,
|
||||
)
|
||||
|
||||
|
||||
async def check_zigpy_connection(usb_path, radio_type, database_path):
|
||||
"""Test zigpy radio connection."""
|
||||
if radio_type == RadioType.ezsp.name:
|
||||
|
@ -20,7 +20,6 @@ from .const import (
|
||||
REPORT_CONFIG_IMMEDIATE,
|
||||
REPORT_CONFIG_MAX_INT,
|
||||
REPORT_CONFIG_MIN_INT,
|
||||
REPORT_CONFIG_OP,
|
||||
SENSOR_ACCELERATION,
|
||||
SENSOR_BATTERY,
|
||||
SENSOR_ELECTRICAL_MEASUREMENT,
|
||||
@ -46,7 +45,6 @@ CUSTOM_CLUSTER_MAPPINGS = {}
|
||||
DEVICE_CLASS = {}
|
||||
DEVICE_TRACKER_CLUSTERS = set()
|
||||
EVENT_RELAY_CLUSTERS = []
|
||||
INPUT_BIND_ONLY_CLUSTERS = []
|
||||
LIGHT_CLUSTERS = set()
|
||||
OUTPUT_CHANNEL_ONLY_CLUSTERS = []
|
||||
RADIO_TYPES = {}
|
||||
@ -145,31 +143,6 @@ def establish_device_mappings():
|
||||
|
||||
CLUSTER_REPORT_CONFIGS.update(
|
||||
{
|
||||
zcl.clusters.general.Alarms.cluster_id: [],
|
||||
zcl.clusters.general.Basic.cluster_id: [],
|
||||
zcl.clusters.general.Commissioning.cluster_id: [],
|
||||
zcl.clusters.general.Identify.cluster_id: [],
|
||||
zcl.clusters.general.Groups.cluster_id: [],
|
||||
zcl.clusters.general.Scenes.cluster_id: [],
|
||||
zcl.clusters.general.Partition.cluster_id: [],
|
||||
zcl.clusters.general.Ota.cluster_id: [],
|
||||
zcl.clusters.general.PowerProfile.cluster_id: [],
|
||||
zcl.clusters.general.ApplianceControl.cluster_id: [],
|
||||
zcl.clusters.general.PollControl.cluster_id: [],
|
||||
zcl.clusters.general.GreenPowerProxy.cluster_id: [],
|
||||
zcl.clusters.general.OnOffConfiguration.cluster_id: [],
|
||||
zcl.clusters.lightlink.LightLink.cluster_id: [],
|
||||
zcl.clusters.general.OnOff.cluster_id: [
|
||||
{"attr": "on_off", "config": REPORT_CONFIG_IMMEDIATE}
|
||||
],
|
||||
zcl.clusters.general.LevelControl.cluster_id: [
|
||||
{"attr": "current_level", "config": REPORT_CONFIG_ASAP}
|
||||
],
|
||||
zcl.clusters.lighting.Color.cluster_id: [
|
||||
{"attr": "current_x", "config": REPORT_CONFIG_DEFAULT},
|
||||
{"attr": "current_y", "config": REPORT_CONFIG_DEFAULT},
|
||||
{"attr": "color_temperature", "config": REPORT_CONFIG_DEFAULT},
|
||||
],
|
||||
zcl.clusters.measurement.RelativeHumidity.cluster_id: [
|
||||
{
|
||||
"attr": "measured_value",
|
||||
@ -203,25 +176,9 @@ def establish_device_mappings():
|
||||
zcl.clusters.smartenergy.Metering.cluster_id: [
|
||||
{"attr": "instantaneous_demand", "config": REPORT_CONFIG_DEFAULT}
|
||||
],
|
||||
zcl.clusters.homeautomation.ElectricalMeasurement.cluster_id: [
|
||||
{"attr": "active_power", "config": REPORT_CONFIG_DEFAULT}
|
||||
],
|
||||
zcl.clusters.general.PowerConfiguration.cluster_id: [
|
||||
{"attr": "battery_voltage", "config": REPORT_CONFIG_DEFAULT},
|
||||
{
|
||||
"attr": "battery_percentage_remaining",
|
||||
"config": REPORT_CONFIG_DEFAULT,
|
||||
},
|
||||
],
|
||||
zcl.clusters.measurement.OccupancySensing.cluster_id: [
|
||||
{"attr": "occupancy", "config": REPORT_CONFIG_IMMEDIATE}
|
||||
],
|
||||
zcl.clusters.hvac.Fan.cluster_id: [
|
||||
{"attr": "fan_mode", "config": REPORT_CONFIG_OP}
|
||||
],
|
||||
zcl.clusters.closures.DoorLock.cluster_id: [
|
||||
{"attr": "lock_state", "config": REPORT_CONFIG_IMMEDIATE}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
@ -260,8 +217,6 @@ def establish_device_mappings():
|
||||
EVENT_RELAY_CLUSTERS.append(zcl.clusters.general.LevelControl.cluster_id)
|
||||
EVENT_RELAY_CLUSTERS.append(zcl.clusters.general.OnOff.cluster_id)
|
||||
|
||||
INPUT_BIND_ONLY_CLUSTERS.append(zcl.clusters.lightlink.LightLink.cluster_id)
|
||||
|
||||
OUTPUT_CHANNEL_ONLY_CLUSTERS.append(zcl.clusters.general.Scenes.cluster_id)
|
||||
|
||||
LIGHT_CLUSTERS.add(zcl.clusters.general.LevelControl.cluster_id)
|
||||
|
@ -68,12 +68,13 @@ class FakeEndpoint:
|
||||
def patch_cluster(cluster):
|
||||
"""Patch a cluster for testing."""
|
||||
cluster.bind = CoroutineMock(return_value=[0])
|
||||
cluster.configure_reporting = CoroutineMock(return_value=[0])
|
||||
cluster.deserialize = Mock()
|
||||
cluster.handle_cluster_request = Mock()
|
||||
cluster.handle_cluster_general_request = Mock()
|
||||
cluster.read_attributes = CoroutineMock()
|
||||
cluster.read_attributes_raw = Mock()
|
||||
cluster.read_attributes = Mock()
|
||||
cluster.unbind = Mock()
|
||||
cluster.unbind = CoroutineMock(return_value=[0])
|
||||
|
||||
|
||||
class FakeDevice:
|
||||
|
136
tests/components/zha/test_channels.py
Normal file
136
tests/components/zha/test_channels.py
Normal file
@ -0,0 +1,136 @@
|
||||
"""Test ZHA Core channels."""
|
||||
import pytest
|
||||
import zigpy.types as t
|
||||
|
||||
import homeassistant.components.zha.core.channels as channels
|
||||
import homeassistant.components.zha.core.channels.registry as channel_reg
|
||||
import homeassistant.components.zha.core.device as zha_device
|
||||
|
||||
from .common import make_device
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ieee():
|
||||
"""IEEE fixture."""
|
||||
return t.EUI64.deserialize(b"ieeeaddr")[0]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def nwk():
|
||||
"""NWK fixture."""
|
||||
return t.NWK(0xBEEF)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"cluster_id, bind_count, attrs",
|
||||
[
|
||||
(0x0000, 1, {}),
|
||||
(0x0001, 1, {"battery_voltage", "battery_percentage_remaining"}),
|
||||
(0x0003, 1, {}),
|
||||
(0x0004, 1, {}),
|
||||
(0x0005, 1, {}),
|
||||
(0x0006, 1, {"on_off"}),
|
||||
(0x0007, 1, {}),
|
||||
(0x0008, 1, {"current_level"}),
|
||||
(0x0009, 1, {}),
|
||||
(0x0015, 1, {}),
|
||||
(0x0016, 1, {}),
|
||||
(0x0019, 1, {}),
|
||||
(0x001A, 1, {}),
|
||||
(0x001B, 1, {}),
|
||||
(0x0020, 1, {}),
|
||||
(0x0021, 1, {}),
|
||||
(0x0101, 1, {"lock_state"}),
|
||||
(0x0202, 1, {"fan_mode"}),
|
||||
(0x0300, 1, {"current_x", "current_y", "color_temperature"}),
|
||||
(0x0400, 1, {"measured_value"}),
|
||||
(0x0402, 1, {"measured_value"}),
|
||||
(0x0403, 1, {"measured_value"}),
|
||||
(0x0405, 1, {"measured_value"}),
|
||||
(0x0406, 1, {"occupancy"}),
|
||||
(0x0702, 1, {"instantaneous_demand"}),
|
||||
(0x0B04, 1, {"active_power"}),
|
||||
(0x1000, 1, {}),
|
||||
],
|
||||
)
|
||||
async def test_in_channel_config(cluster_id, bind_count, attrs, zha_gateway, hass):
|
||||
"""Test ZHA core channel configuration for input clusters."""
|
||||
zigpy_dev = make_device(
|
||||
[cluster_id],
|
||||
[],
|
||||
0x1234,
|
||||
"00:11:22:33:44:55:66:77",
|
||||
"test manufacturer",
|
||||
"test model",
|
||||
)
|
||||
zha_dev = zha_device.ZHADevice(hass, zigpy_dev, zha_gateway)
|
||||
|
||||
cluster = zigpy_dev.endpoints[1].in_clusters[cluster_id]
|
||||
channel_class = channel_reg.ZIGBEE_CHANNEL_REGISTRY.get(
|
||||
cluster_id, channels.AttributeListeningChannel
|
||||
)
|
||||
channel = channel_class(cluster, zha_dev)
|
||||
|
||||
await channel.async_configure()
|
||||
|
||||
assert cluster.bind.call_count == bind_count
|
||||
assert cluster.configure_reporting.call_count == len(attrs)
|
||||
reported_attrs = {attr[0][0] for attr in cluster.configure_reporting.call_args_list}
|
||||
assert set(attrs) == reported_attrs
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"cluster_id, bind_count",
|
||||
[
|
||||
(0x0000, 1),
|
||||
(0x0001, 1),
|
||||
(0x0003, 1),
|
||||
(0x0004, 1),
|
||||
(0x0005, 1),
|
||||
(0x0006, 1),
|
||||
(0x0007, 1),
|
||||
(0x0008, 1),
|
||||
(0x0009, 1),
|
||||
(0x0015, 1),
|
||||
(0x0016, 1),
|
||||
(0x0019, 1),
|
||||
(0x001A, 1),
|
||||
(0x001B, 1),
|
||||
(0x0020, 1),
|
||||
(0x0021, 1),
|
||||
(0x0101, 1),
|
||||
(0x0202, 1),
|
||||
(0x0300, 1),
|
||||
(0x0400, 1),
|
||||
(0x0402, 1),
|
||||
(0x0403, 1),
|
||||
(0x0405, 1),
|
||||
(0x0406, 1),
|
||||
(0x0702, 1),
|
||||
(0x0B04, 1),
|
||||
(0x1000, 1),
|
||||
],
|
||||
)
|
||||
async def test_out_channel_config(cluster_id, bind_count, zha_gateway, hass):
|
||||
"""Test ZHA core channel configuration for output clusters."""
|
||||
zigpy_dev = make_device(
|
||||
[],
|
||||
[cluster_id],
|
||||
0x1234,
|
||||
"00:11:22:33:44:55:66:77",
|
||||
"test manufacturer",
|
||||
"test model",
|
||||
)
|
||||
zha_dev = zha_device.ZHADevice(hass, zigpy_dev, zha_gateway)
|
||||
|
||||
cluster = zigpy_dev.endpoints[1].out_clusters[cluster_id]
|
||||
cluster.bind_only = True
|
||||
channel_class = channel_reg.ZIGBEE_CHANNEL_REGISTRY.get(
|
||||
cluster_id, channels.AttributeListeningChannel
|
||||
)
|
||||
channel = channel_class(cluster, zha_dev)
|
||||
|
||||
await channel.async_configure()
|
||||
|
||||
assert cluster.bind.call_count == bind_count
|
||||
assert cluster.configure_reporting.call_count == 0
|
Loading…
x
Reference in New Issue
Block a user