mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 13:17:32 +00:00
ZHA channel ZCL attributes initialization (#56476)
* Add dict of attributes to initialize * Refactor get_attributes() method Read 5 attributes at the time. * Add ZCL_INIT_ATTRS attribute to base Zigbee channel * Update tests and general clusters * Update channels to use ZCL_INIT_ATTRS * Update channels to use ZCL_INIT_ATTRS * Fix tests * Refactor async_initialize() to be a retryable request * Maky pylint happy again
This commit is contained in:
parent
2478ec887a
commit
a5d405700c
@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
from functools import wraps
|
||||
from functools import partialmethod, wraps
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
@ -30,8 +30,9 @@ from ..const import (
|
||||
ZHA_CHANNEL_MSG_BIND,
|
||||
ZHA_CHANNEL_MSG_CFG_RPT,
|
||||
ZHA_CHANNEL_MSG_DATA,
|
||||
ZHA_CHANNEL_READS_PER_REQ,
|
||||
)
|
||||
from ..helpers import LogMixin, safe_read
|
||||
from ..helpers import LogMixin, retryable_req, safe_read
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -92,6 +93,11 @@ class ZigbeeChannel(LogMixin):
|
||||
REPORT_CONFIG: tuple[dict[int | str, tuple[int, int, int | float]]] = ()
|
||||
BIND: bool = True
|
||||
|
||||
# Dict of attributes to read on channel initialization.
|
||||
# Dict keys -- attribute ID or names, with bool value indicating whether a cached
|
||||
# attribute read is acceptable.
|
||||
ZCL_INIT_ATTRS: dict[int | str, bool] = {}
|
||||
|
||||
def __init__(
|
||||
self, cluster: zha_typing.ZigpyClusterType, ch_pool: zha_typing.ChannelPoolType
|
||||
) -> None:
|
||||
@ -301,6 +307,7 @@ class ZigbeeChannel(LogMixin):
|
||||
self.debug("skipping channel configuration")
|
||||
self._status = ChannelStatus.CONFIGURED
|
||||
|
||||
@retryable_req(delays=(1, 1, 3))
|
||||
async def async_initialize(self, from_cache: bool) -> None:
|
||||
"""Initialize channel."""
|
||||
if not from_cache and self._ch_pool.skip_configuration:
|
||||
@ -308,9 +315,14 @@ class ZigbeeChannel(LogMixin):
|
||||
return
|
||||
|
||||
self.debug("initializing channel: from_cache: %s", from_cache)
|
||||
attributes = [cfg["attr"] for cfg in self.REPORT_CONFIG]
|
||||
if attributes:
|
||||
await self.get_attributes(attributes, from_cache=from_cache)
|
||||
cached = [a for a, cached in self.ZCL_INIT_ATTRS.items() if cached]
|
||||
uncached = [a for a, cached in self.ZCL_INIT_ATTRS.items() if not cached]
|
||||
uncached.extend([cfg["attr"] for cfg in self.REPORT_CONFIG])
|
||||
|
||||
if cached:
|
||||
await self._get_attributes(True, cached, from_cache=True)
|
||||
if uncached:
|
||||
await self._get_attributes(True, uncached, from_cache=from_cache)
|
||||
|
||||
ch_specific_init = getattr(self, "async_initialize_channel_specific", None)
|
||||
if ch_specific_init:
|
||||
@ -367,28 +379,43 @@ class ZigbeeChannel(LogMixin):
|
||||
)
|
||||
return result.get(attribute)
|
||||
|
||||
async def get_attributes(self, attributes, from_cache=True):
|
||||
async def _get_attributes(
|
||||
self,
|
||||
raise_exceptions: bool,
|
||||
attributes: list[int | str],
|
||||
from_cache: bool = True,
|
||||
) -> dict[int | str, Any]:
|
||||
"""Get the values for a list of attributes."""
|
||||
manufacturer = None
|
||||
manufacturer_code = self._ch_pool.manufacturer_code
|
||||
if self.cluster.cluster_id >= 0xFC00 and manufacturer_code:
|
||||
manufacturer = manufacturer_code
|
||||
try:
|
||||
result, _ = await self.cluster.read_attributes(
|
||||
attributes,
|
||||
allow_cache=from_cache,
|
||||
only_cache=from_cache and not self._ch_pool.is_mains_powered,
|
||||
manufacturer=manufacturer,
|
||||
)
|
||||
return result
|
||||
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
|
||||
self.debug(
|
||||
"failed to get attributes '%s' on '%s' cluster: %s",
|
||||
attributes,
|
||||
self.cluster.ep_attribute,
|
||||
str(ex),
|
||||
)
|
||||
return {}
|
||||
chunk = attributes[:ZHA_CHANNEL_READS_PER_REQ]
|
||||
rest = attributes[ZHA_CHANNEL_READS_PER_REQ:]
|
||||
result = {}
|
||||
while chunk:
|
||||
try:
|
||||
read, _ = await self.cluster.read_attributes(
|
||||
attributes,
|
||||
allow_cache=from_cache,
|
||||
only_cache=from_cache and not self._ch_pool.is_mains_powered,
|
||||
manufacturer=manufacturer,
|
||||
)
|
||||
result.update(read)
|
||||
except (asyncio.TimeoutError, zigpy.exceptions.ZigbeeException) as ex:
|
||||
self.debug(
|
||||
"failed to get attributes '%s' on '%s' cluster: %s",
|
||||
attributes,
|
||||
self.cluster.ep_attribute,
|
||||
str(ex),
|
||||
)
|
||||
if raise_exceptions:
|
||||
raise
|
||||
chunk = rest[:ZHA_CHANNEL_READS_PER_REQ]
|
||||
rest = rest[ZHA_CHANNEL_READS_PER_REQ:]
|
||||
return result
|
||||
|
||||
get_attributes = partialmethod(_get_attributes, False)
|
||||
|
||||
def log(self, level, msg, *args):
|
||||
"""Log a message."""
|
||||
|
@ -23,7 +23,6 @@ from ..const import (
|
||||
SIGNAL_SET_LEVEL,
|
||||
SIGNAL_UPDATE_DEVICE,
|
||||
)
|
||||
from ..helpers import retryable_req
|
||||
from .base import ClientChannel, ZigbeeChannel, parse_and_log_command
|
||||
|
||||
|
||||
@ -44,7 +43,16 @@ class AnalogInput(ZigbeeChannel):
|
||||
class AnalogOutput(ZigbeeChannel):
|
||||
"""Analog Output channel."""
|
||||
|
||||
REPORT_CONFIG = [{"attr": "present_value", "config": REPORT_CONFIG_DEFAULT}]
|
||||
REPORT_CONFIG = ({"attr": "present_value", "config": REPORT_CONFIG_DEFAULT},)
|
||||
ZCL_INIT_ATTRS = {
|
||||
"min_present_value": True,
|
||||
"max_present_value": True,
|
||||
"resolution": True,
|
||||
"relinquish_default": True,
|
||||
"description": True,
|
||||
"engineering_units": True,
|
||||
"application_type": True,
|
||||
}
|
||||
|
||||
@property
|
||||
def present_value(self) -> float | None:
|
||||
@ -99,25 +107,6 @@ class AnalogOutput(ZigbeeChannel):
|
||||
return True
|
||||
return False
|
||||
|
||||
@retryable_req(delays=(1, 1, 3))
|
||||
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||
"""Initialize channel."""
|
||||
return self.fetch_config(from_cache)
|
||||
|
||||
async def fetch_config(self, from_cache: bool) -> None:
|
||||
"""Get the channel configuration."""
|
||||
attributes = [
|
||||
"min_present_value",
|
||||
"max_present_value",
|
||||
"resolution",
|
||||
"relinquish_default",
|
||||
"description",
|
||||
"engineering_units",
|
||||
"application_type",
|
||||
]
|
||||
# just populates the cache, if not already done
|
||||
await self.get_attributes(attributes, from_cache=from_cache)
|
||||
|
||||
|
||||
@registries.ZIGBEE_CHANNEL_REGISTRY.register(general.AnalogValue.cluster_id)
|
||||
class AnalogValue(ZigbeeChannel):
|
||||
|
@ -1,8 +1,6 @@
|
||||
"""Home automation channels module for Zigbee Home Automation."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Coroutine
|
||||
|
||||
from zigpy.zcl.clusters import homeautomation
|
||||
|
||||
from .. import registries
|
||||
@ -49,6 +47,12 @@ class ElectricalMeasurementChannel(ZigbeeChannel):
|
||||
CHANNEL_NAME = CHANNEL_ELECTRICAL_MEASUREMENT
|
||||
|
||||
REPORT_CONFIG = ({"attr": "active_power", "config": REPORT_CONFIG_DEFAULT},)
|
||||
ZCL_INIT_ATTRS = {
|
||||
"ac_power_divisor": True,
|
||||
"power_divisor": True,
|
||||
"ac_power_multiplier": True,
|
||||
"power_multiplier": True,
|
||||
}
|
||||
|
||||
async def async_update(self):
|
||||
"""Retrieve latest state."""
|
||||
@ -64,19 +68,6 @@ class ElectricalMeasurementChannel(ZigbeeChannel):
|
||||
result,
|
||||
)
|
||||
|
||||
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||
"""Initialize channel specific attributes."""
|
||||
|
||||
return self.get_attributes(
|
||||
[
|
||||
"ac_power_divisor",
|
||||
"power_divisor",
|
||||
"ac_power_multiplier",
|
||||
"power_multiplier",
|
||||
],
|
||||
from_cache=True,
|
||||
)
|
||||
|
||||
@property
|
||||
def divisor(self) -> int | None:
|
||||
"""Return active power divisor."""
|
||||
|
@ -15,14 +15,13 @@ from zigpy.zcl.foundation import Status
|
||||
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .. import registries, typing as zha_typing
|
||||
from .. import registries
|
||||
from ..const import (
|
||||
REPORT_CONFIG_MAX_INT,
|
||||
REPORT_CONFIG_MIN_INT,
|
||||
REPORT_CONFIG_OP,
|
||||
SIGNAL_ATTR_UPDATED,
|
||||
)
|
||||
from ..helpers import retryable_req
|
||||
from .base import ZigbeeChannel
|
||||
|
||||
AttributeUpdateRecord = namedtuple("AttributeUpdateRecord", "attr_id, attr_name, value")
|
||||
@ -43,12 +42,18 @@ class FanChannel(ZigbeeChannel):
|
||||
_value_attribute = 0
|
||||
|
||||
REPORT_CONFIG = ({"attr": "fan_mode", "config": REPORT_CONFIG_OP},)
|
||||
ZCL_INIT_ATTRS = {"fan_mode_sequence": True}
|
||||
|
||||
@property
|
||||
def fan_mode(self) -> int | None:
|
||||
"""Return current fan mode."""
|
||||
return self.cluster.get("fan_mode")
|
||||
|
||||
@property
|
||||
def fan_mode_sequence(self) -> int | None:
|
||||
"""Return possible fan mode speeds."""
|
||||
return self.cluster.get("fan_mode_sequence")
|
||||
|
||||
async def async_set_speed(self, value) -> None:
|
||||
"""Set the speed of the fan."""
|
||||
|
||||
@ -97,34 +102,17 @@ class ThermostatChannel(ZigbeeChannel):
|
||||
{"attr": "pi_cooling_demand", "config": REPORT_CONFIG_CLIMATE_DEMAND},
|
||||
{"attr": "pi_heating_demand", "config": REPORT_CONFIG_CLIMATE_DEMAND},
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self, cluster: zha_typing.ZigpyClusterType, ch_pool: zha_typing.ChannelPoolType
|
||||
) -> None:
|
||||
"""Init Thermostat channel instance."""
|
||||
super().__init__(cluster, ch_pool)
|
||||
self._init_attrs = {
|
||||
"abs_min_heat_setpoint_limit": True,
|
||||
"abs_max_heat_setpoint_limit": True,
|
||||
"abs_min_cool_setpoint_limit": True,
|
||||
"abs_max_cool_setpoint_limit": True,
|
||||
"ctrl_seqe_of_oper": False,
|
||||
"local_temp": False,
|
||||
"max_cool_setpoint_limit": True,
|
||||
"max_heat_setpoint_limit": True,
|
||||
"min_cool_setpoint_limit": True,
|
||||
"min_heat_setpoint_limit": True,
|
||||
"occupancy": False,
|
||||
"occupied_cooling_setpoint": False,
|
||||
"occupied_heating_setpoint": False,
|
||||
"pi_cooling_demand": False,
|
||||
"pi_heating_demand": False,
|
||||
"running_mode": False,
|
||||
"running_state": False,
|
||||
"system_mode": False,
|
||||
"unoccupied_heating_setpoint": False,
|
||||
"unoccupied_cooling_setpoint": False,
|
||||
}
|
||||
ZCL_INIT_ATTRS: dict[int | str, bool] = {
|
||||
"abs_min_heat_setpoint_limit": True,
|
||||
"abs_max_heat_setpoint_limit": True,
|
||||
"abs_min_cool_setpoint_limit": True,
|
||||
"abs_max_cool_setpoint_limit": True,
|
||||
"ctrl_seqe_of_oper": False,
|
||||
"max_cool_setpoint_limit": True,
|
||||
"max_heat_setpoint_limit": True,
|
||||
"min_cool_setpoint_limit": True,
|
||||
"min_heat_setpoint_limit": True,
|
||||
}
|
||||
|
||||
@property
|
||||
def abs_max_cool_setpoint_limit(self) -> int:
|
||||
@ -250,32 +238,6 @@ class ThermostatChannel(ZigbeeChannel):
|
||||
AttributeUpdateRecord(attrid, attr_name, value),
|
||||
)
|
||||
|
||||
async def _chunk_attr_read(self, attrs, cached=False):
|
||||
chunk, attrs = attrs[:4], attrs[4:]
|
||||
while chunk:
|
||||
res, fail = await self.cluster.read_attributes(chunk, allow_cache=cached)
|
||||
self.debug("read attributes: Success: %s. Failed: %s", res, fail)
|
||||
for attr in chunk:
|
||||
self._init_attrs.pop(attr, None)
|
||||
if attr in fail:
|
||||
continue
|
||||
self.async_send_signal(
|
||||
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
|
||||
AttributeUpdateRecord(None, attr, res[attr]),
|
||||
)
|
||||
|
||||
chunk, attrs = attrs[:4], attrs[4:]
|
||||
|
||||
@retryable_req(delays=(1, 1, 3))
|
||||
async def async_initialize_channel_specific(self, from_cache: bool) -> None:
|
||||
"""Initialize channel."""
|
||||
|
||||
cached = [a for a, cached in self._init_attrs.items() if cached]
|
||||
uncached = [a for a, cached in self._init_attrs.items() if not cached]
|
||||
|
||||
await self._chunk_attr_read(cached, cached=True)
|
||||
await self._chunk_attr_read(uncached, cached=False)
|
||||
|
||||
async def async_set_operation_mode(self, mode) -> bool:
|
||||
"""Set Operation mode."""
|
||||
if not await self.write_attributes({"system_mode": mode}):
|
||||
|
@ -1,7 +1,6 @@
|
||||
"""Lighting channels module for Zigbee Home Automation."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Coroutine
|
||||
from contextlib import suppress
|
||||
|
||||
from zigpy.zcl.clusters import lighting
|
||||
@ -36,6 +35,12 @@ class ColorChannel(ZigbeeChannel):
|
||||
)
|
||||
MAX_MIREDS: int = 500
|
||||
MIN_MIREDS: int = 153
|
||||
ZCL_INIT_ATTRS = {
|
||||
"color_temp_physical_min": True,
|
||||
"color_temp_physical_max": True,
|
||||
"color_capabilities": True,
|
||||
"color_loop_active": False,
|
||||
}
|
||||
|
||||
@property
|
||||
def color_capabilities(self) -> int:
|
||||
@ -75,22 +80,3 @@ class ColorChannel(ZigbeeChannel):
|
||||
def max_mireds(self) -> int:
|
||||
"""Return the warmest color_temp that this channel supports."""
|
||||
return self.cluster.get("color_temp_physical_max", self.MAX_MIREDS)
|
||||
|
||||
def async_configure_channel_specific(self) -> Coroutine:
|
||||
"""Configure channel."""
|
||||
return self.fetch_color_capabilities(False)
|
||||
|
||||
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||
"""Initialize channel."""
|
||||
return self.fetch_color_capabilities(True)
|
||||
|
||||
async def fetch_color_capabilities(self, from_cache: bool) -> None:
|
||||
"""Get the color configuration."""
|
||||
attributes = [
|
||||
"color_temp_physical_min",
|
||||
"color_temp_physical_max",
|
||||
"color_capabilities",
|
||||
"color_temperature",
|
||||
]
|
||||
# just populates the cache, if not already done
|
||||
await self.get_attributes(attributes, from_cache=from_cache)
|
||||
|
@ -7,7 +7,6 @@ https://home-assistant.io/integrations/zha/
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Coroutine
|
||||
import logging
|
||||
|
||||
from zigpy.exceptions import ZigbeeException
|
||||
@ -345,6 +344,8 @@ class IasWd(ZigbeeChannel):
|
||||
class IASZoneChannel(ZigbeeChannel):
|
||||
"""Channel for the IASZone Zigbee cluster."""
|
||||
|
||||
ZCL_INIT_ATTRS = {"zone_status": True, "zone_state": False, "zone_type": True}
|
||||
|
||||
@callback
|
||||
def cluster_command(self, tsn, command_id, args):
|
||||
"""Handle commands received to this cluster."""
|
||||
@ -404,8 +405,3 @@ class IASZoneChannel(ZigbeeChannel):
|
||||
self.cluster.attributes.get(attrid, [attrid])[0],
|
||||
value,
|
||||
)
|
||||
|
||||
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||
"""Initialize channel."""
|
||||
attributes = ["zone_status", "zone_state", "zone_type"]
|
||||
return self.get_attributes(attributes, from_cache=from_cache)
|
||||
|
@ -1,8 +1,6 @@
|
||||
"""Smart energy channels module for Zigbee Home Automation."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Coroutine
|
||||
|
||||
from zigpy.zcl.clusters import smartenergy
|
||||
|
||||
from homeassistant.const import (
|
||||
@ -63,7 +61,13 @@ class Messaging(ZigbeeChannel):
|
||||
class Metering(ZigbeeChannel):
|
||||
"""Metering channel."""
|
||||
|
||||
REPORT_CONFIG = [{"attr": "instantaneous_demand", "config": REPORT_CONFIG_DEFAULT}]
|
||||
REPORT_CONFIG = ({"attr": "instantaneous_demand", "config": REPORT_CONFIG_DEFAULT},)
|
||||
ZCL_INIT_ATTRS = {
|
||||
"divisor": True,
|
||||
"multiplier": True,
|
||||
"unit_of_measure": True,
|
||||
"demand_formatting": True,
|
||||
}
|
||||
|
||||
unit_of_measure_map = {
|
||||
0x00: POWER_WATT,
|
||||
@ -98,14 +102,6 @@ class Metering(ZigbeeChannel):
|
||||
"""Return multiplier for the value."""
|
||||
return self.cluster.get("multiplier") or 1
|
||||
|
||||
def async_configure_channel_specific(self) -> Coroutine:
|
||||
"""Configure channel."""
|
||||
return self.fetch_config(False)
|
||||
|
||||
def async_initialize_channel_specific(self, from_cache: bool) -> Coroutine:
|
||||
"""Initialize channel."""
|
||||
return self.fetch_config(True)
|
||||
|
||||
@callback
|
||||
def attribute_updated(self, attrid: int, value: int) -> None:
|
||||
"""Handle attribute update from Metering cluster."""
|
||||
@ -119,14 +115,10 @@ class Metering(ZigbeeChannel):
|
||||
uom = self.cluster.get("unit_of_measure", 0x7F)
|
||||
return self.unit_of_measure_map.get(uom & 0x7F, "unknown")
|
||||
|
||||
async def fetch_config(self, from_cache: bool) -> None:
|
||||
async def async_initialize_channel_specific(self, from_cache: bool) -> None:
|
||||
"""Fetch config from device and updates format specifier."""
|
||||
results = await self.get_attributes(
|
||||
["divisor", "multiplier", "unit_of_measure", "demand_formatting"],
|
||||
from_cache=from_cache,
|
||||
)
|
||||
|
||||
fmting = results.get(
|
||||
fmting = self.cluster.get(
|
||||
"demand_formatting", 0xF9
|
||||
) # 1 digit to the right, 15 digits to the left
|
||||
|
||||
|
@ -374,6 +374,7 @@ ZHA_CHANNEL_MSG_BIND = "zha_channel_bind"
|
||||
ZHA_CHANNEL_MSG_CFG_RPT = "zha_channel_configure_reporting"
|
||||
ZHA_CHANNEL_MSG_DATA = "zha_channel_msg_data"
|
||||
ZHA_CHANNEL_CFG_DONE = "zha_channel_cfg_done"
|
||||
ZHA_CHANNEL_READS_PER_REQ = 5
|
||||
ZHA_GW_MSG = "zha_gateway_message"
|
||||
ZHA_GW_MSG_DEVICE_FULL_INIT = "device_fully_initialized"
|
||||
ZHA_GW_MSG_DEVICE_INFO = "device_info"
|
||||
|
@ -56,6 +56,18 @@ def patch_cluster(cluster):
|
||||
cluster.add = AsyncMock(return_value=[0])
|
||||
|
||||
|
||||
def update_attribute_cache(cluster):
|
||||
"""Update attribute cache based on plugged attributes."""
|
||||
if cluster.PLUGGED_ATTR_READS:
|
||||
attrs = [
|
||||
make_attribute(cluster.attridx.get(attr, attr), value)
|
||||
for attr, value in cluster.PLUGGED_ATTR_READS.items()
|
||||
]
|
||||
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
|
||||
hdr.frame_control.disable_default_response = True
|
||||
cluster.handle_message(hdr, [attrs])
|
||||
|
||||
|
||||
def get_zha_gateway(hass):
|
||||
"""Return ZHA gateway from hass.data."""
|
||||
try:
|
||||
|
@ -476,7 +476,7 @@ async def test_fan_update_entity(
|
||||
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE] == 0
|
||||
assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None
|
||||
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3
|
||||
assert cluster.read_attributes.await_count == 1
|
||||
assert cluster.read_attributes.await_count == 2
|
||||
|
||||
await async_setup_component(hass, "homeassistant", {})
|
||||
await hass.async_block_till_done()
|
||||
@ -486,7 +486,7 @@ async def test_fan_update_entity(
|
||||
)
|
||||
assert hass.states.get(entity_id).state == STATE_OFF
|
||||
assert hass.states.get(entity_id).attributes[ATTR_SPEED] == SPEED_OFF
|
||||
assert cluster.read_attributes.await_count == 2
|
||||
assert cluster.read_attributes.await_count == 3
|
||||
|
||||
cluster.PLUGGED_ATTR_READS = {"fan_mode": 1}
|
||||
await hass.services.async_call(
|
||||
@ -497,4 +497,4 @@ async def test_fan_update_entity(
|
||||
assert hass.states.get(entity_id).attributes[ATTR_SPEED] == SPEED_LOW
|
||||
assert hass.states.get(entity_id).attributes[ATTR_PRESET_MODE] is None
|
||||
assert hass.states.get(entity_id).attributes[ATTR_PERCENTAGE_STEP] == 100 / 3
|
||||
assert cluster.read_attributes.await_count == 3
|
||||
assert cluster.read_attributes.await_count == 4
|
||||
|
@ -16,6 +16,7 @@ from .common import (
|
||||
async_test_rejoin,
|
||||
find_entity_id,
|
||||
send_attributes_report,
|
||||
update_attribute_cache,
|
||||
)
|
||||
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE
|
||||
|
||||
@ -41,25 +42,30 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
|
||||
|
||||
cluster = zigpy_analog_output_device.endpoints.get(1).analog_output
|
||||
cluster.PLUGGED_ATTR_READS = {
|
||||
"present_value": 15.0,
|
||||
"max_present_value": 100.0,
|
||||
"min_present_value": 0.0,
|
||||
"min_present_value": 1.0,
|
||||
"relinquish_default": 50.0,
|
||||
"resolution": 1.0,
|
||||
"resolution": 1.1,
|
||||
"description": "PWM1",
|
||||
"engineering_units": 98,
|
||||
"application_type": 4 * 0x10000,
|
||||
}
|
||||
update_attribute_cache(cluster)
|
||||
cluster.PLUGGED_ATTR_READS["present_value"] = 15.0
|
||||
|
||||
zha_device = await zha_device_joined_restored(zigpy_analog_output_device)
|
||||
# one for present_value and one for the rest configuration attributes
|
||||
assert cluster.read_attributes.call_count == 2
|
||||
assert "max_present_value" in cluster.read_attributes.call_args[0][0]
|
||||
assert "min_present_value" in cluster.read_attributes.call_args[0][0]
|
||||
assert "relinquish_default" in cluster.read_attributes.call_args[0][0]
|
||||
assert "resolution" in cluster.read_attributes.call_args[0][0]
|
||||
assert "description" in cluster.read_attributes.call_args[0][0]
|
||||
assert "engineering_units" in cluster.read_attributes.call_args[0][0]
|
||||
assert "application_type" in cluster.read_attributes.call_args[0][0]
|
||||
assert cluster.read_attributes.call_count == 3
|
||||
attr_reads = set()
|
||||
for call_args in cluster.read_attributes.call_args_list:
|
||||
attr_reads |= set(call_args[0][0])
|
||||
assert "max_present_value" in attr_reads
|
||||
assert "min_present_value" in attr_reads
|
||||
assert "relinquish_default" in attr_reads
|
||||
assert "resolution" in attr_reads
|
||||
assert "description" in attr_reads
|
||||
assert "engineering_units" in attr_reads
|
||||
assert "application_type" in attr_reads
|
||||
|
||||
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
|
||||
assert entity_id is not None
|
||||
@ -69,18 +75,18 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
|
||||
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||
|
||||
# allow traffic to flow through the gateway and device
|
||||
assert cluster.read_attributes.call_count == 2
|
||||
assert cluster.read_attributes.call_count == 3
|
||||
await async_enable_traffic(hass, [zha_device])
|
||||
await hass.async_block_till_done()
|
||||
assert cluster.read_attributes.call_count == 4
|
||||
assert cluster.read_attributes.call_count == 6
|
||||
|
||||
# test that the state has changed from unavailable to 15.0
|
||||
assert hass.states.get(entity_id).state == "15.0"
|
||||
|
||||
# test attributes
|
||||
assert hass.states.get(entity_id).attributes.get("min") == 0.0
|
||||
assert hass.states.get(entity_id).attributes.get("min") == 1.0
|
||||
assert hass.states.get(entity_id).attributes.get("max") == 100.0
|
||||
assert hass.states.get(entity_id).attributes.get("step") == 1.0
|
||||
assert hass.states.get(entity_id).attributes.get("step") == 1.1
|
||||
assert hass.states.get(entity_id).attributes.get("icon") == "mdi:percent"
|
||||
assert hass.states.get(entity_id).attributes.get("unit_of_measurement") == "%"
|
||||
assert (
|
||||
@ -89,7 +95,7 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
|
||||
)
|
||||
|
||||
# change value from device
|
||||
assert cluster.read_attributes.call_count == 4
|
||||
assert cluster.read_attributes.call_count == 6
|
||||
await send_attributes_report(hass, cluster, {0x0055: 15})
|
||||
assert hass.states.get(entity_id).state == "15.0"
|
||||
|
||||
@ -111,10 +117,10 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
|
||||
cluster.PLUGGED_ATTR_READS["present_value"] = 30.0
|
||||
|
||||
# test rejoin
|
||||
assert cluster.read_attributes.call_count == 4
|
||||
assert cluster.read_attributes.call_count == 6
|
||||
await async_test_rejoin(hass, zigpy_analog_output_device, [cluster], (1,))
|
||||
assert hass.states.get(entity_id).state == "30.0"
|
||||
assert cluster.read_attributes.call_count == 6
|
||||
assert cluster.read_attributes.call_count == 9
|
||||
|
||||
# update device value with failed attribute report
|
||||
cluster.PLUGGED_ATTR_READS["present_value"] = 40.0
|
||||
@ -128,5 +134,5 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
|
||||
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
|
||||
)
|
||||
assert hass.states.get(entity_id).state == "40.0"
|
||||
assert cluster.read_attributes.call_count == 7
|
||||
assert cluster.read_attributes.call_count == 10
|
||||
assert "present_value" in cluster.read_attributes.call_args[0][0]
|
||||
|
Loading…
x
Reference in New Issue
Block a user