Add ZHA Coordinator to LightLink cluster groups (#43959)

* Add coordinator to LighLink cluster groups

* Make pylint happy
This commit is contained in:
Alexei Chetroi 2020-12-05 11:20:10 -05:00 committed by GitHub
parent 9b7ecddde6
commit 40e5634db3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 12 deletions

View File

@ -1,11 +1,41 @@
"""Lightlink channels module for Zigbee Home Automation."""
import asyncio
import zigpy.exceptions
import zigpy.zcl.clusters.lightlink as lightlink
from .. import registries
from .base import ZigbeeChannel
from .base import ChannelStatus, ZigbeeChannel
@registries.CHANNEL_ONLY_CLUSTERS.register(lightlink.LightLink.cluster_id)
@registries.ZIGBEE_CHANNEL_REGISTRY.register(lightlink.LightLink.cluster_id)
class LightLink(ZigbeeChannel):
"""Lightlink channel."""
async def async_configure(self) -> None:
"""Add Coordinator to LightLink group ."""
if self._ch_pool.skip_configuration:
self._status = ChannelStatus.CONFIGURED
return
application = self._ch_pool.endpoint.device.application
try:
coordinator = application.get_device(application.ieee)
except KeyError:
self.warning("Aborting - unable to locate required coordinator device.")
return
try:
_, _, groups = await self.cluster.get_group_identifiers(0)
except (zigpy.exceptions.ZigbeeException, asyncio.TimeoutError) as exc:
self.warning("Couldn't get list of groups: %s", str(exc))
return
if groups:
for group in groups:
self.debug("Adding coordinator to 0x%04x group id", group.group_id)
await coordinator.add_to_group(group.group_id)
else:
await coordinator.add_to_group(0x0000, name="Default Lightlink Group")

View File

@ -14,7 +14,7 @@ import homeassistant.components.zha.core.registries as registries
from .common import get_zha_gateway, make_zcl_header
import tests.async_mock
from tests.async_mock import AsyncMock, patch
from tests.common import async_capture_events
@ -38,9 +38,26 @@ async def zha_gateway(hass, setup_zha):
@pytest.fixture
def channel_pool():
def zigpy_coordinator_device(zigpy_device_mock):
"""Coordinator device fixture."""
coordinator = zigpy_device_mock(
{1: {"in_clusters": [0x1000], "out_clusters": [], "device_type": 0x1234}},
"00:11:22:33:44:55:66:77",
"test manufacturer",
"test model",
)
with patch.object(coordinator, "add_to_group", AsyncMock(return_value=[0])):
yield coordinator
@pytest.fixture
def channel_pool(zigpy_coordinator_device):
"""Endpoint Channels fixture."""
ch_pool_mock = mock.MagicMock(spec_set=zha_channels.ChannelPool)
ch_pool_mock.endpoint.device.application.get_device.return_value = (
zigpy_coordinator_device
)
type(ch_pool_mock).skip_configuration = mock.PropertyMock(return_value=False)
ch_pool_mock.id = 1
return ch_pool_mock
@ -117,7 +134,6 @@ async def poll_control_device(zha_device_restored, zigpy_device_mock):
(0x0406, 1, {"occupancy"}),
(0x0702, 1, {"instantaneous_demand"}),
(0x0B04, 1, {"active_power"}),
(0x1000, 1, {}),
],
)
async def test_in_channel_config(
@ -174,7 +190,6 @@ async def test_in_channel_config(
(0x0406, 1),
(0x0702, 1),
(0x0B04, 1),
(0x1000, 1),
],
)
async def test_out_channel_config(
@ -386,12 +401,12 @@ async def test_ep_channels_configure(channel):
ch_1 = channel(zha_const.CHANNEL_ON_OFF, 6)
ch_2 = channel(zha_const.CHANNEL_LEVEL, 8)
ch_3 = channel(zha_const.CHANNEL_COLOR, 768)
ch_3.async_configure = tests.async_mock.AsyncMock(side_effect=asyncio.TimeoutError)
ch_3.async_initialize = tests.async_mock.AsyncMock(side_effect=asyncio.TimeoutError)
ch_3.async_configure = AsyncMock(side_effect=asyncio.TimeoutError)
ch_3.async_initialize = AsyncMock(side_effect=asyncio.TimeoutError)
ch_4 = channel(zha_const.CHANNEL_ON_OFF, 6)
ch_5 = channel(zha_const.CHANNEL_LEVEL, 8)
ch_5.async_configure = tests.async_mock.AsyncMock(side_effect=asyncio.TimeoutError)
ch_5.async_initialize = tests.async_mock.AsyncMock(side_effect=asyncio.TimeoutError)
ch_5.async_configure = AsyncMock(side_effect=asyncio.TimeoutError)
ch_5.async_initialize = AsyncMock(side_effect=asyncio.TimeoutError)
channels = mock.MagicMock(spec_set=zha_channels.Channels)
type(channels).semaphore = mock.PropertyMock(return_value=asyncio.Semaphore(3))
@ -427,8 +442,8 @@ async def test_poll_control_configure(poll_control_ch):
async def test_poll_control_checkin_response(poll_control_ch):
"""Test poll control channel checkin response."""
rsp_mock = tests.async_mock.AsyncMock()
set_interval_mock = tests.async_mock.AsyncMock()
rsp_mock = AsyncMock()
set_interval_mock = AsyncMock()
cluster = poll_control_ch.cluster
patch_1 = mock.patch.object(cluster, "checkin_response", rsp_mock)
patch_2 = mock.patch.object(cluster, "set_long_poll_interval", set_interval_mock)
@ -449,7 +464,7 @@ async def test_poll_control_checkin_response(poll_control_ch):
async def test_poll_control_cluster_command(hass, poll_control_device):
"""Test poll control channel response to cluster command."""
checkin_mock = tests.async_mock.AsyncMock()
checkin_mock = AsyncMock()
poll_control_ch = poll_control_device.channels.pools[0].all_channels["1:0x0020"]
cluster = poll_control_ch.cluster
events = async_capture_events(hass, "zha_event")
@ -474,3 +489,60 @@ async def test_poll_control_cluster_command(hass, poll_control_device):
assert data["args"][2] is mock.sentinel.args3
assert data["unique_id"] == "00:11:22:33:44:55:66:77:1:0x0020"
assert data["device_id"] == poll_control_device.device_id
@pytest.fixture
def zigpy_zll_device(zigpy_device_mock):
"""ZLL device fixture."""
return zigpy_device_mock(
{1: {"in_clusters": [0x1000], "out_clusters": [], "device_type": 0x1234}},
"00:11:22:33:44:55:66:77",
"test manufacturer",
"test model",
)
async def test_zll_device_groups(
zigpy_zll_device, channel_pool, zigpy_coordinator_device
):
"""Test adding coordinator to ZLL groups."""
cluster = zigpy_zll_device.endpoints[1].lightlink
channel = zha_channels.lightlink.LightLink(cluster, channel_pool)
with patch.object(
cluster, "command", AsyncMock(return_value=[1, 0, []])
) as cmd_mock:
await channel.async_configure()
assert cmd_mock.await_count == 1
assert (
cluster.server_commands[cmd_mock.await_args[0][0]][0]
== "get_group_identifiers"
)
assert cluster.bind.call_count == 0
assert zigpy_coordinator_device.add_to_group.await_count == 1
assert zigpy_coordinator_device.add_to_group.await_args[0][0] == 0x0000
zigpy_coordinator_device.add_to_group.reset_mock()
group_1 = zigpy.zcl.clusters.lightlink.GroupInfoRecord(0xABCD, 0x00)
group_2 = zigpy.zcl.clusters.lightlink.GroupInfoRecord(0xAABB, 0x00)
with patch.object(
cluster, "command", AsyncMock(return_value=[1, 0, [group_1, group_2]])
) as cmd_mock:
await channel.async_configure()
assert cmd_mock.await_count == 1
assert (
cluster.server_commands[cmd_mock.await_args[0][0]][0]
== "get_group_identifiers"
)
assert cluster.bind.call_count == 0
assert zigpy_coordinator_device.add_to_group.await_count == 2
assert (
zigpy_coordinator_device.add_to_group.await_args_list[0][0][0]
== group_1.group_id
)
assert (
zigpy_coordinator_device.add_to_group.await_args_list[1][0][0]
== group_2.group_id
)