ZHA: Support light flashing (#32234)

This commit is contained in:
Robert Chmielowiec 2020-03-01 00:37:06 +01:00 committed by GitHub
parent 3ab04118f6
commit 8e3492d4f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 7 deletions

View File

@ -39,6 +39,7 @@ class Channels:
"""Initialize instance."""
self._pools: List[zha_typing.ChannelPoolType] = []
self._power_config = None
self._identify = None
self._semaphore = asyncio.Semaphore(3)
self._unique_id = str(zha_device.ieee)
self._zdo_channel = base.ZDOChannel(zha_device.device.endpoints[0], zha_device)
@ -60,6 +61,17 @@ class Channels:
if self._power_config is None:
self._power_config = channel
@property
def identify_ch(self) -> zha_typing.ChannelType:
"""Return power configuration channel."""
return self._identify
@identify_ch.setter
def identify_ch(self, channel: zha_typing.ChannelType) -> None:
"""Power configuration channel setter."""
if self._identify is None:
self._identify = channel
@property
def semaphore(self) -> asyncio.Semaphore:
"""Return semaphore for concurrent tasks."""
@ -242,6 +254,8 @@ class ChannelPool:
# on power configuration channel per device
continue
self._channels.power_configuration_ch = channel
elif channel.name == const.CHANNEL_IDENTIFY:
self._channels.identify_ch = channel
self.all_channels[channel.id] = channel

View File

@ -153,7 +153,13 @@ class Groups(ZigbeeChannel):
class Identify(ZigbeeChannel):
"""Identify channel."""
pass
@callback
def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
cmd = parse_and_log_command(self, tsn, command_id, args)
if cmd == "trigger_effect":
self.async_send_signal(f"{self.unique_id}_{cmd}", args[0])
@registries.BINDABLE_CLUSTERS.register(general.LevelControl.cluster_id)

View File

@ -62,6 +62,7 @@ CHANNEL_EVENT_RELAY = "event_relay"
CHANNEL_FAN = "fan"
CHANNEL_HUMIDITY = "humidity"
CHANNEL_IAS_WD = "ias_wd"
CHANNEL_IDENTIFY = "identify"
CHANNEL_ILLUMINANCE = "illuminance"
CHANNEL_LEVEL = ATTR_LEVEL
CHANNEL_MULTISTATE_INPUT = "multistate_input"

View File

@ -24,6 +24,7 @@ from .core.const import (
SIGNAL_SET_LEVEL,
)
from .core.registries import ZHA_ENTITIES
from .core.typing import ZhaDeviceType
from .entity import ZhaEntity
_LOGGER = logging.getLogger(__name__)
@ -37,6 +38,8 @@ UPDATE_COLORLOOP_DIRECTION = 0x2
UPDATE_COLORLOOP_TIME = 0x4
UPDATE_COLORLOOP_HUE = 0x8
FLASH_EFFECTS = {light.FLASH_SHORT: 0x00, light.FLASH_LONG: 0x01}
UNSUPPORTED_ATTRIBUTE = 0x86
SCAN_INTERVAL = timedelta(minutes=60)
STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, light.DOMAIN)
@ -61,7 +64,7 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
class Light(ZhaEntity, light.Light):
"""Representation of a ZHA or ZLL light."""
def __init__(self, unique_id, zha_device, channels, **kwargs):
def __init__(self, unique_id, zha_device: ZhaDeviceType, channels, **kwargs):
"""Initialize the ZHA light."""
super().__init__(unique_id, zha_device, channels, **kwargs)
self._supported_features = 0
@ -74,6 +77,7 @@ class Light(ZhaEntity, light.Light):
self._on_off_channel = self.cluster_channels.get(CHANNEL_ON_OFF)
self._level_channel = self.cluster_channels.get(CHANNEL_LEVEL)
self._color_channel = self.cluster_channels.get(CHANNEL_COLOR)
self._identify_channel = self.zha_device.channels.identify_ch
if self._level_channel:
self._supported_features |= light.SUPPORT_BRIGHTNESS
@ -93,6 +97,9 @@ class Light(ZhaEntity, light.Light):
self._supported_features |= light.SUPPORT_EFFECT
self._effect_list.append(light.EFFECT_COLORLOOP)
if self._identify_channel:
self._supported_features |= light.SUPPORT_FLASH
@property
def is_on(self) -> bool:
"""Return true if entity is on."""
@ -188,6 +195,7 @@ class Light(ZhaEntity, light.Light):
duration = transition * 10 if transition else 0
brightness = kwargs.get(light.ATTR_BRIGHTNESS)
effect = kwargs.get(light.ATTR_EFFECT)
flash = kwargs.get(light.ATTR_FLASH)
if brightness is None and self._off_brightness is not None:
brightness = self._off_brightness
@ -277,6 +285,12 @@ class Light(ZhaEntity, light.Light):
t_log["color_loop_set"] = result
self._effect = None
if flash is not None and self._supported_features & light.SUPPORT_FLASH:
result = await self._identify_channel.trigger_effect(
FLASH_EFFECTS[flash], 0 # effect identifier, effect variant
)
t_log["trigger_effect"] = result
self._off_brightness = None
self.debug("turned on: %s", t_log)
self.async_schedule_update_ha_state()

View File

@ -9,7 +9,8 @@ import zigpy.zcl.clusters.general as general
import zigpy.zcl.clusters.lighting as lighting
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.light import DOMAIN
from homeassistant.components.light import DOMAIN, FLASH_LONG, FLASH_SHORT
from homeassistant.components.zha.light import FLASH_EFFECTS
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
@ -26,7 +27,11 @@ OFF = 0
LIGHT_ON_OFF = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
"in_clusters": [general.Basic.cluster_id, general.OnOff.cluster_id],
"in_clusters": [
general.Basic.cluster_id,
general.Identify.cluster_id,
general.OnOff.cluster_id,
],
"out_clusters": [general.Ota.cluster_id],
}
}
@ -48,6 +53,7 @@ LIGHT_COLOR = {
"device_type": zigpy.profiles.zha.DeviceType.COLOR_DIMMABLE_LIGHT,
"in_clusters": [
general.Basic.cluster_id,
general.Identify.cluster_id,
general.LevelControl.cluster_id,
general.OnOff.cluster_id,
lighting.Color.cluster_id,
@ -61,6 +67,10 @@ LIGHT_COLOR = {
"zigpy.zcl.clusters.lighting.Color.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
)
@asynctest.patch(
"zigpy.zcl.clusters.general.Identify.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
)
@asynctest.patch(
"zigpy.zcl.clusters.general.LevelControl.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
@ -88,6 +98,7 @@ async def test_light(
cluster_on_off = zigpy_device.endpoints[1].on_off
cluster_level = getattr(zigpy_device.endpoints[1], "level", None)
cluster_color = getattr(zigpy_device.endpoints[1], "light_color", None)
cluster_identify = getattr(zigpy_device.endpoints[1], "identify", None)
# test that the lights were created and that they are unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
@ -104,6 +115,11 @@ async def test_light(
# test turning the lights on and off from the HA
await async_test_on_off_from_hass(hass, cluster_on_off, entity_id)
# test short flashing the lights from the HA
if cluster_identify:
await async_test_flash_from_hass(hass, cluster_identify, entity_id, FLASH_SHORT)
# test turning the lights on and off from the HA
if cluster_level:
await async_test_level_on_off_from_hass(
hass, cluster_on_off, cluster_level, entity_id
@ -124,6 +140,10 @@ async def test_light(
clusters.append(cluster_color)
await async_test_rejoin(hass, zigpy_device, clusters, reporting)
# test long flashing the lights from the HA
if cluster_identify:
await async_test_flash_from_hass(hass, cluster_identify, entity_id, FLASH_LONG)
async def async_test_on_off_from_light(hass, cluster, entity_id):
"""Test on off functionality from the light."""
@ -197,7 +217,7 @@ async def async_test_level_on_off_from_hass(
assert level_cluster.request.call_count == 0
assert level_cluster.request.await_count == 0
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
False, ON, (), expect_reply=True, manufacturer=None
)
on_off_cluster.request.reset_mock()
level_cluster.request.reset_mock()
@ -210,7 +230,7 @@ async def async_test_level_on_off_from_hass(
assert level_cluster.request.call_count == 1
assert level_cluster.request.await_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
False, ON, (), expect_reply=True, manufacturer=None
)
assert level_cluster.request.call_args == call(
False,
@ -232,7 +252,7 @@ async def async_test_level_on_off_from_hass(
assert level_cluster.request.call_count == 1
assert level_cluster.request.await_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
False, ON, (), expect_reply=True, manufacturer=None
)
assert level_cluster.request.call_args == call(
False,
@ -260,3 +280,23 @@ async def async_test_dimmer_from_light(hass, cluster, entity_id, level, expected
if level == 0:
level = None
assert hass.states.get(entity_id).attributes.get("brightness") == level
async def async_test_flash_from_hass(hass, cluster, entity_id, flash):
"""Test flash functionality from hass."""
# turn on via UI
cluster.request.reset_mock()
await hass.services.async_call(
DOMAIN, "turn_on", {"entity_id": entity_id, "flash": flash}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.await_count == 1
assert cluster.request.call_args == call(
False,
64,
(zigpy.types.uint8_t, zigpy.types.uint8_t),
FLASH_EFFECTS[flash],
0,
expect_reply=True,
manufacturer=None,
)