Add additional configuration entities for ZHA lights (#70597)

* Add more configuration entities for ZHA lights

* fix typing circular imports

* enhance filter in entity factory

* fix read attribute chunking

* add test

* add exception test
This commit is contained in:
David F. Mulcahey 2022-05-16 13:04:32 -04:00 committed by GitHub
parent 32b3ce5727
commit 57c94c0350
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 341 additions and 7 deletions

View File

@ -446,7 +446,7 @@ class ZigbeeChannel(LogMixin):
try: try:
self.debug("Reading attributes in chunks: %s", chunk) self.debug("Reading attributes in chunks: %s", chunk)
read, _ = await self.cluster.read_attributes( read, _ = await self.cluster.read_attributes(
attributes, chunk,
allow_cache=from_cache, allow_cache=from_cache,
only_cache=only_cache, only_cache=only_cache,
manufacturer=manufacturer, manufacturer=manufacturer,

View File

@ -222,6 +222,14 @@ class LevelControlChannel(ZigbeeChannel):
CURRENT_LEVEL = 0 CURRENT_LEVEL = 0
REPORT_CONFIG = ({"attr": "current_level", "config": REPORT_CONFIG_ASAP},) REPORT_CONFIG = ({"attr": "current_level", "config": REPORT_CONFIG_ASAP},)
ZCL_INIT_ATTRS = {
"on_off_transition_time": True,
"on_level": True,
"on_transition_time": True,
"off_transition_time": True,
"default_move_rate": True,
"start_up_current_level": True,
}
@property @property
def current_level(self) -> int | None: def current_level(self) -> int | None:

View File

@ -1,17 +1,25 @@
"""Support for ZHA AnalogOutput cluster.""" """Support for ZHA AnalogOutput cluster."""
from __future__ import annotations
import functools import functools
import logging import logging
from typing import TYPE_CHECKING
import zigpy.exceptions
from zigpy.zcl.foundation import Status
from homeassistant.components.number import NumberEntity from homeassistant.components.number import NumberEntity
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import EntityCategory
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .core import discovery from .core import discovery
from .core.const import ( from .core.const import (
CHANNEL_ANALOG_OUTPUT, CHANNEL_ANALOG_OUTPUT,
CHANNEL_LEVEL,
DATA_ZHA, DATA_ZHA,
SIGNAL_ADD_ENTITIES, SIGNAL_ADD_ENTITIES,
SIGNAL_ATTR_UPDATED, SIGNAL_ATTR_UPDATED,
@ -19,9 +27,16 @@ from .core.const import (
from .core.registries import ZHA_ENTITIES from .core.registries import ZHA_ENTITIES
from .entity import ZhaEntity from .entity import ZhaEntity
if TYPE_CHECKING:
from .core.channels.base import ZigbeeChannel
from .core.device import ZHADevice
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, Platform.NUMBER) STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, Platform.NUMBER)
CONFIG_DIAGNOSTIC_MATCH = functools.partial(
ZHA_ENTITIES.config_diagnostic_match, Platform.NUMBER
)
UNITS = { UNITS = {
@ -342,3 +357,141 @@ class ZhaNumber(ZhaEntity, NumberEntity):
"present_value", from_cache=False "present_value", from_cache=False
) )
_LOGGER.debug("read value=%s", value) _LOGGER.debug("read value=%s", value)
class ZHANumberConfigurationEntity(ZhaEntity, NumberEntity):
"""Representation of a ZHA number configuration entity."""
_attr_entity_category = EntityCategory.CONFIG
_attr_step: float = 1.0
_zcl_attribute: str
@classmethod
def create_entity(
cls,
unique_id: str,
zha_device: ZHADevice,
channels: list[ZigbeeChannel],
**kwargs,
) -> ZhaEntity | None:
"""Entity Factory.
Return entity if it is a supported configuration, otherwise return None
"""
channel = channels[0]
if (
cls._zcl_attribute in channel.cluster.unsupported_attributes
or channel.cluster.get(cls._zcl_attribute) is None
):
_LOGGER.debug(
"%s is not supported - skipping %s entity creation",
cls._zcl_attribute,
cls.__name__,
)
return None
return cls(unique_id, zha_device, channels, **kwargs)
def __init__(
self,
unique_id: str,
zha_device: ZHADevice,
channels: list[ZigbeeChannel],
**kwargs,
) -> None:
"""Init this number configuration entity."""
self._channel: ZigbeeChannel = channels[0]
super().__init__(unique_id, zha_device, channels, **kwargs)
@property
def value(self) -> float:
"""Return the current value."""
return self._channel.cluster.get(self._zcl_attribute)
async def async_set_value(self, value: float) -> None:
"""Update the current value from HA."""
try:
res = await self._channel.cluster.write_attributes(
{self._zcl_attribute: int(value)}
)
except zigpy.exceptions.ZigbeeException as ex:
self.error("Could not set value: %s", ex)
return
if not isinstance(res, Exception) and all(
record.status == Status.SUCCESS for record in res[0]
):
self.async_write_ha_state()
async def async_update(self) -> None:
"""Attempt to retrieve the state of the entity."""
await super().async_update()
_LOGGER.debug("polling current state")
if self._channel:
value = await self._channel.get_attribute_value(
self._zcl_attribute, from_cache=False
)
_LOGGER.debug("read value=%s", value)
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class OnOffTransitionTimeConfigurationEntity(
ZHANumberConfigurationEntity, id_suffix="on_off_transition_time"
):
"""Representation of a ZHA on off transition time configuration entity."""
_attr_min_value: float = 0x0000
_attr_max_value: float = 0xFFFF
_zcl_attribute: str = "on_off_transition_time"
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class OnLevelConfigurationEntity(ZHANumberConfigurationEntity, id_suffix="on_level"):
"""Representation of a ZHA on level configuration entity."""
_attr_min_value: float = 0x00
_attr_max_value: float = 0xFF
_zcl_attribute: str = "on_level"
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class OnTransitionTimeConfigurationEntity(
ZHANumberConfigurationEntity, id_suffix="on_transition_time"
):
"""Representation of a ZHA on transition time configuration entity."""
_attr_min_value: float = 0x0000
_attr_max_value: float = 0xFFFE
_zcl_attribute: str = "on_transition_time"
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class OffTransitionTimeConfigurationEntity(
ZHANumberConfigurationEntity, id_suffix="off_transition_time"
):
"""Representation of a ZHA off transition time configuration entity."""
_attr_min_value: float = 0x0000
_attr_max_value: float = 0xFFFE
_zcl_attribute: str = "off_transition_time"
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class DefaultMoveRateConfigurationEntity(
ZHANumberConfigurationEntity, id_suffix="default_move_rate"
):
"""Representation of a ZHA default move rate configuration entity."""
_attr_min_value: float = 0x00
_attr_max_value: float = 0xFE
_zcl_attribute: str = "default_move_rate"
@CONFIG_DIAGNOSTIC_MATCH(channel_names=CHANNEL_LEVEL)
class StartUpCurrentLevelConfigurationEntity(
ZHANumberConfigurationEntity, id_suffix="start_up_current_level"
):
"""Representation of a ZHA startup current level configuration entity."""
_attr_min_value: float = 0x00
_attr_max_value: float = 0xFF
_zcl_attribute: str = "start_up_current_level"

View File

@ -44,7 +44,15 @@ from .zha_devices_list import (
NO_TAIL_ID = re.compile("_\\d$") NO_TAIL_ID = re.compile("_\\d$")
UNIQUE_ID_HD = re.compile(r"^(([\da-fA-F]{2}:){7}[\da-fA-F]{2}-\d{1,3})", re.X) UNIQUE_ID_HD = re.compile(r"^(([\da-fA-F]{2}:){7}[\da-fA-F]{2}-\d{1,3})", re.X)
IGNORE_SUFFIXES = [zigpy.zcl.clusters.general.OnOff.StartUpOnOff.__name__] IGNORE_SUFFIXES = [
zigpy.zcl.clusters.general.OnOff.StartUpOnOff.__name__,
"on_off_transition_time",
"on_level",
"on_transition_time",
"off_transition_time",
"default_move_rate",
"start_up_current_level",
]
def contains_ignored_suffix(unique_id: str) -> bool: def contains_ignored_suffix(unique_id: str) -> bool:

View File

@ -2,13 +2,14 @@
from unittest.mock import call, patch from unittest.mock import call, patch
import pytest import pytest
import zigpy.profiles.zha from zigpy.exceptions import ZigbeeException
import zigpy.types from zigpy.profiles import zha
import zigpy.zcl.clusters.general as general import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f import zigpy.zcl.foundation as zcl_f
from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN from homeassistant.components.number import DOMAIN as NUMBER_DOMAIN
from homeassistant.const import STATE_UNAVAILABLE, Platform from homeassistant.const import ENTITY_CATEGORY_CONFIG, STATE_UNAVAILABLE, Platform
from homeassistant.helpers import entity_registry as er
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from .common import ( from .common import (
@ -18,7 +19,7 @@ from .common import (
send_attributes_report, send_attributes_report,
update_attribute_cache, update_attribute_cache,
) )
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
from tests.common import mock_coro from tests.common import mock_coro
@ -29,7 +30,7 @@ def zigpy_analog_output_device(zigpy_device_mock):
endpoints = { endpoints = {
1: { 1: {
SIG_EP_TYPE: zigpy.profiles.zha.DeviceType.LEVEL_CONTROL_SWITCH, SIG_EP_TYPE: zha.DeviceType.LEVEL_CONTROL_SWITCH,
SIG_EP_INPUT: [general.AnalogOutput.cluster_id, general.Basic.cluster_id], SIG_EP_INPUT: [general.AnalogOutput.cluster_id, general.Basic.cluster_id],
SIG_EP_OUTPUT: [], SIG_EP_OUTPUT: [],
} }
@ -37,6 +38,30 @@ def zigpy_analog_output_device(zigpy_device_mock):
return zigpy_device_mock(endpoints) return zigpy_device_mock(endpoints)
@pytest.fixture
async def light(zigpy_device_mock):
"""Siren fixture."""
zigpy_device = zigpy_device_mock(
{
1: {
SIG_EP_PROFILE: zha.PROFILE_ID,
SIG_EP_TYPE: zha.DeviceType.ON_OFF_LIGHT,
SIG_EP_INPUT: [
general.Basic.cluster_id,
general.Identify.cluster_id,
general.OnOff.cluster_id,
general.LevelControl.cluster_id,
],
SIG_EP_OUTPUT: [general.Ota.cluster_id],
}
},
node_descriptor=b"\x02@\x84_\x11\x7fd\x00\x00,d\x00\x00",
)
return zigpy_device
async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_device): async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_device):
"""Test zha number platform.""" """Test zha number platform."""
@ -139,3 +164,143 @@ async def test_number(hass, zha_device_joined_restored, zigpy_analog_output_devi
assert hass.states.get(entity_id).state == "40.0" assert hass.states.get(entity_id).state == "40.0"
assert cluster.read_attributes.call_count == 10 assert cluster.read_attributes.call_count == 10
assert "present_value" in cluster.read_attributes.call_args[0][0] assert "present_value" in cluster.read_attributes.call_args[0][0]
@pytest.mark.parametrize(
"attr, initial_value, new_value",
(
("on_off_transition_time", 20, 5),
("on_level", 255, 50),
("on_transition_time", 5, 1),
("off_transition_time", 5, 1),
("default_move_rate", 1, 5),
("start_up_current_level", 254, 125),
),
)
async def test_level_control_number(
hass, light, zha_device_joined, attr, initial_value, new_value
):
"""Test zha level control number entities - new join."""
entity_registry = er.async_get(hass)
level_control_cluster = light.endpoints[1].level
level_control_cluster.PLUGGED_ATTR_READS = {
attr: initial_value,
}
zha_device = await zha_device_joined(light)
entity_id = await find_entity_id(
Platform.NUMBER,
zha_device,
hass,
qualifier=attr,
)
assert entity_id is not None
assert level_control_cluster.read_attributes.call_count == 3
assert (
call(
[
"on_off_transition_time",
"on_level",
"on_transition_time",
"off_transition_time",
"default_move_rate",
],
allow_cache=True,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
assert (
call(
["start_up_current_level"],
allow_cache=True,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
assert (
call(
[
"current_level",
],
allow_cache=False,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
state = hass.states.get(entity_id)
assert state
assert state.state == str(initial_value)
entity_entry = entity_registry.async_get(entity_id)
assert entity_entry
assert entity_entry.entity_category == ENTITY_CATEGORY_CONFIG
# Test number set_value
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
assert level_control_cluster.write_attributes.call_count == 1
assert level_control_cluster.write_attributes.call_args[0][0] == {
attr: new_value,
}
state = hass.states.get(entity_id)
assert state
assert state.state == str(new_value)
level_control_cluster.read_attributes.reset_mock()
await async_setup_component(hass, "homeassistant", {})
await hass.async_block_till_done()
await hass.services.async_call(
"homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True
)
# the mocking doesn't update the attr cache so this flips back to initial value
assert hass.states.get(entity_id).state == str(initial_value)
assert level_control_cluster.read_attributes.call_count == 1
assert (
call(
[
attr,
],
allow_cache=False,
only_cache=False,
manufacturer=None,
)
in level_control_cluster.read_attributes.call_args_list
)
level_control_cluster.write_attributes.reset_mock()
level_control_cluster.write_attributes.side_effect = ZigbeeException
await hass.services.async_call(
"number",
"set_value",
{
"entity_id": entity_id,
"value": new_value,
},
blocking=True,
)
assert level_control_cluster.write_attributes.call_count == 1
assert level_control_cluster.write_attributes.call_args[0][0] == {
attr: new_value,
}
assert hass.states.get(entity_id).state == str(initial_value)