From 28eeed1db389c3dda138663f54401a9ce8b8bd64 Mon Sep 17 00:00:00 2001 From: Alexei Chetroi Date: Sun, 9 Feb 2020 21:45:35 -0500 Subject: [PATCH] ZHA tests refactoring (#31682) * Fixtures for restoring/joning a device. * binary_sensor.zha tests. * cover.zha platform tests. * device_tracker.zha platform tests. * fan.zha platform tests. * switch.zha platform tests. * Update light.zha platform tests. * Update sensor.zha platform tests. * ZHA api tests refactoring. * Update lock.zha platform tests. * Update ZHA gateway tests. * Update zha device action tests. * Update zha device trigger tests. * Cleanup. --- tests/components/zha/common.py | 94 +------ tests/components/zha/conftest.py | 93 ++++--- tests/components/zha/test_api.py | 114 +++++---- tests/components/zha/test_binary_sensor.py | 128 +++++----- tests/components/zha/test_cover.py | 54 ++-- tests/components/zha/test_device_action.py | 70 ++--- tests/components/zha/test_device_tracker.py | 69 ++--- tests/components/zha/test_device_trigger.py | 103 +++----- tests/components/zha/test_fan.py | 35 +-- tests/components/zha/test_gateway.py | 46 ++-- tests/components/zha/test_light.py | 237 ++++++++--------- tests/components/zha/test_lock.py | 36 +-- tests/components/zha/test_sensor.py | 268 +++++++++----------- tests/components/zha/test_switch.py | 42 +-- 14 files changed, 648 insertions(+), 741 deletions(-) diff --git a/tests/components/zha/common.py b/tests/components/zha/common.py index 9b6a8b5b55f..97668bef2ea 100644 --- a/tests/components/zha/common.py +++ b/tests/components/zha/common.py @@ -1,6 +1,6 @@ """Common test objects.""" import time -from unittest.mock import Mock, patch +from unittest.mock import Mock from asynctest import CoroutineMock import zigpy.profiles.zha @@ -18,8 +18,6 @@ from homeassistant.components.zha.core.const import ( ) from homeassistant.util import slugify -from tests.common import mock_coro - class FakeApplication: """Fake application for mocking zigpy.""" @@ -100,66 +98,6 @@ class FakeDevice: self.node_desc = zigpy.zdo.types.NodeDescriptor.deserialize(node_desc)[0] -def make_device(endpoints, ieee, manufacturer, model): - """Make a fake device using the specified cluster classes.""" - device = FakeDevice(ieee, manufacturer, model) - for epid, ep in endpoints.items(): - endpoint = FakeEndpoint(manufacturer, model, epid) - endpoint.device = device - device.endpoints[epid] = endpoint - endpoint.device_type = ep["device_type"] - profile_id = ep.get("profile_id") - if profile_id: - endpoint.profile_id = profile_id - - for cluster_id in ep.get("in_clusters", []): - endpoint.add_input_cluster(cluster_id) - - for cluster_id in ep.get("out_clusters", []): - endpoint.add_output_cluster(cluster_id) - - return device - - -async def async_init_zigpy_device( - hass, - in_cluster_ids, - out_cluster_ids, - device_type, - gateway, - ieee="00:0d:6f:00:0a:90:69:e7", - manufacturer="FakeManufacturer", - model="FakeModel", - is_new_join=False, -): - """Create and initialize a device. - - This creates a fake device and adds it to the "network". It can be used to - test existing device functionality and new device pairing functionality. - The is_new_join parameter influences whether or not the device will go - through cluster binding and zigbee cluster configure reporting. That only - happens when the device is paired to the network for the first time. - """ - device = make_device( - { - 1: { - "in_clusters": in_cluster_ids, - "out_clusters": out_cluster_ids, - "device_type": device_type, - } - }, - ieee, - manufacturer, - model, - ) - if is_new_join: - await gateway.async_device_initialized(device) - else: - await gateway.async_device_restored(device) - await hass.async_block_till_done() - return device - - def make_attribute(attrid, value, status=0): """Make an attribute.""" attr = zcl_f.Attribute() @@ -202,36 +140,6 @@ async def async_enable_traffic(hass, zha_gateway, zha_devices): await hass.async_block_till_done() -async def async_test_device_join( - hass, zha_gateway, cluster_id, entity_id, device_type=None -): - """Test a newly joining device. - - This creates a new fake device and adds it to the network. It is meant to - simulate pairing a new device to the network so that code pathways that - only trigger during device joins can be tested. - """ - # create zigpy device mocking out the zigbee network operations - with patch( - "zigpy.zcl.Cluster.configure_reporting", - return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]), - ): - with patch( - "zigpy.zcl.Cluster.bind", - return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]), - ): - await async_init_zigpy_device( - hass, - [cluster_id, zigpy.zcl.clusters.general.Basic.cluster_id], - [], - device_type, - zha_gateway, - ieee="00:0d:6f:00:0a:90:69:f7", - is_new_join=True, - ) - assert hass.states.get(entity_id) is not None - - def make_zcl_header(command_id: int, global_command: bool = True) -> zcl_f.ZCLHeader: """Cluster.handle_message() ZCL Header helper.""" if global_command: diff --git a/tests/components/zha/conftest.py b/tests/components/zha/conftest.py index 18344172d29..f54c6ca8602 100644 --- a/tests/components/zha/conftest.py +++ b/tests/components/zha/conftest.py @@ -1,4 +1,5 @@ """Test configuration for the ZHA component.""" +import functools from unittest import mock from unittest.mock import patch @@ -7,7 +8,6 @@ import pytest import zigpy from zigpy.application import ControllerApplication -from homeassistant import config_entries from homeassistant.components.zha.core.const import COMPONENTS, DATA_ZHA, DOMAIN from homeassistant.components.zha.core.gateway import ZHAGateway from homeassistant.components.zha.core.store import async_get_registry @@ -15,27 +15,39 @@ from homeassistant.helpers.device_registry import async_get_registry as get_dev_ from .common import FakeDevice, FakeEndpoint, async_setup_entry +from tests.common import MockConfigEntry + FIXTURE_GRP_ID = 0x1001 FIXTURE_GRP_NAME = "fixture group" @pytest.fixture(name="config_entry") -def config_entry_fixture(hass): +async def config_entry_fixture(hass): """Fixture representing a config entry.""" - config_entry = config_entries.ConfigEntry( - 1, - DOMAIN, - "Mock Title", - {}, - "test", - config_entries.CONN_CLASS_LOCAL_PUSH, - system_options={}, - ) + config_entry = MockConfigEntry(domain=DOMAIN) + config_entry.add_to_hass(hass) return config_entry +@pytest.fixture +async def setup_zha(hass, config_entry): + """Load the ZHA component. + + This will init the ZHA component. It loads the component in HA so that + we can test the domains that ZHA supports without actually having a zigbee + network running. + """ + # this prevents needing an actual radio and zigbee network available + with patch("homeassistant.components.zha.async_setup_entry", async_setup_entry): + hass.data[DATA_ZHA] = {} + + # init ZHA + await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) + await hass.async_block_till_done() + + @pytest.fixture(name="zha_gateway") -async def zha_gateway_fixture(hass, config_entry): +async def zha_gateway_fixture(hass, config_entry, setup_zha): """Fixture representing a zha gateway. Create a ZHAGateway object that can be used to interact with as if we @@ -57,23 +69,6 @@ async def zha_gateway_fixture(hass, config_entry): return gateway -@pytest.fixture(autouse=True) -async def setup_zha(hass, config_entry): - """Load the ZHA component. - - This will init the ZHA component. It loads the component in HA so that - we can test the domains that ZHA supports without actually having a zigbee - network running. - """ - # this prevents needing an actual radio and zigbee network available - with patch("homeassistant.components.zha.async_setup_entry", async_setup_entry): - hass.data[DATA_ZHA] = {} - - # init ZHA - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - - @pytest.fixture def channel(): """Channel mock factory fixture.""" @@ -121,3 +116,43 @@ def zigpy_device_mock(): return device return _mock_dev + + +@pytest.fixture +def _zha_device_restored_or_joined(hass, zha_gateway, config_entry): + """Make a restored or joined ZHA devices.""" + + async def _zha_device(is_new_join, zigpy_dev): + if is_new_join: + for cmp in COMPONENTS: + await hass.config_entries.async_forward_entry_setup(config_entry, cmp) + await hass.async_block_till_done() + await zha_gateway.async_device_initialized(zigpy_dev) + else: + await zha_gateway.async_device_restored(zigpy_dev) + for cmp in COMPONENTS: + await hass.config_entries.async_forward_entry_setup(config_entry, cmp) + await hass.async_block_till_done() + return zha_gateway.get_device(zigpy_dev.ieee) + + return _zha_device + + +@pytest.fixture +def zha_device_joined(_zha_device_restored_or_joined): + """Return a newly joined ZHA device.""" + + return functools.partial(_zha_device_restored_or_joined, True) + + +@pytest.fixture +def zha_device_restored(_zha_device_restored_or_joined): + """Return a restored ZHA device.""" + + return functools.partial(_zha_device_restored_or_joined, False) + + +@pytest.fixture(params=["zha_device_joined", "zha_device_restored"]) +def zha_device_joined_restored(request): + """Join or restore ZHA device.""" + return request.getfixturevalue(request.param) diff --git a/tests/components/zha/test_api.py b/tests/components/zha/test_api.py index f01d27eb167..c61ecfa8c71 100644 --- a/tests/components/zha/test_api.py +++ b/tests/components/zha/test_api.py @@ -1,11 +1,9 @@ """Test ZHA API.""" import pytest -import zigpy +import zigpy.profiles.zha import zigpy.zcl.clusters.general as general -from homeassistant.components.light import DOMAIN as light_domain -from homeassistant.components.switch import DOMAIN from homeassistant.components.websocket_api import const from homeassistant.components.zha.api import ID, TYPE, async_load_api from homeassistant.components.zha.core.const import ( @@ -23,50 +21,67 @@ from homeassistant.components.zha.core.const import ( GROUP_NAME, ) -from .common import async_init_zigpy_device from .conftest import FIXTURE_GRP_ID, FIXTURE_GRP_NAME +IEEE_SWITCH_DEVICE = "01:2d:6f:00:0a:90:69:e7" +IEEE_GROUPABLE_DEVICE = "01:2d:6f:00:0a:90:69:e8" + @pytest.fixture -async def zha_client(hass, config_entry, zha_gateway, hass_ws_client): +async def device_switch(hass, zha_gateway, zigpy_device_mock, zha_device_joined): + """Test zha switch platform.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [general.OnOff.cluster_id, general.Basic.cluster_id], + "out_clusters": [], + "device_type": zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH, + } + }, + ieee=IEEE_SWITCH_DEVICE, + ) + zha_device = await zha_device_joined(zigpy_device) + zha_device.set_available(True) + return zha_device + + +@pytest.fixture +async def device_groupable(hass, zha_gateway, zigpy_device_mock, zha_device_joined): + """Test zha light platform.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [ + general.OnOff.cluster_id, + general.Basic.cluster_id, + general.Groups.cluster_id, + ], + "out_clusters": [], + "device_type": zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH, + } + }, + ieee=IEEE_GROUPABLE_DEVICE, + ) + zha_device = await zha_device_joined(zigpy_device) + zha_device.set_available(True) + return zha_device + + +@pytest.fixture +async def zha_client(hass, hass_ws_client, device_switch, device_groupable): """Test zha switch platform.""" # load the ZHA API async_load_api(hass) - - # create zigpy device - await async_init_zigpy_device( - hass, - [general.OnOff.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ) - - await async_init_zigpy_device( - hass, - [general.OnOff.cluster_id, general.Basic.cluster_id, general.Groups.cluster_id], - [], - zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT, - zha_gateway, - manufacturer="FakeGroupManufacturer", - model="FakeGroupModel", - ieee="01:2d:6f:00:0a:90:69:e8", - ) - - # load up switch domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - await hass.config_entries.async_forward_entry_setup(config_entry, light_domain) - await hass.async_block_till_done() - return await hass_ws_client(hass) async def test_device_clusters(hass, config_entry, zha_gateway, zha_client): """Test getting device cluster info.""" await zha_client.send_json( - {ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7"} + {ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: IEEE_SWITCH_DEVICE} ) msg = await zha_client.receive_json() @@ -86,14 +101,14 @@ async def test_device_clusters(hass, config_entry, zha_gateway, zha_client): assert cluster_info[ATTR_NAME] == "OnOff" -async def test_device_cluster_attributes(hass, config_entry, zha_gateway, zha_client): +async def test_device_cluster_attributes(zha_client): """Test getting device cluster attributes.""" await zha_client.send_json( { ID: 5, TYPE: "zha/devices/clusters/attributes", ATTR_ENDPOINT_ID: 1, - ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7", + ATTR_IEEE: IEEE_SWITCH_DEVICE, ATTR_CLUSTER_ID: 6, ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN, } @@ -109,14 +124,14 @@ async def test_device_cluster_attributes(hass, config_entry, zha_gateway, zha_cl assert attribute[ATTR_NAME] is not None -async def test_device_cluster_commands(hass, config_entry, zha_gateway, zha_client): +async def test_device_cluster_commands(zha_client): """Test getting device cluster commands.""" await zha_client.send_json( { ID: 5, TYPE: "zha/devices/clusters/commands", ATTR_ENDPOINT_ID: 1, - ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7", + ATTR_IEEE: IEEE_SWITCH_DEVICE, ATTR_CLUSTER_ID: 6, ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN, } @@ -133,7 +148,7 @@ async def test_device_cluster_commands(hass, config_entry, zha_gateway, zha_clie assert command[TYPE] is not None -async def test_list_devices(hass, config_entry, zha_gateway, zha_client): +async def test_list_devices(zha_client): """Test getting zha devices.""" await zha_client.send_json({ID: 5, TYPE: "zha/devices"}) @@ -164,7 +179,7 @@ async def test_list_devices(hass, config_entry, zha_gateway, zha_client): assert device == device2 -async def test_device_not_found(hass, config_entry, zha_gateway, zha_client): +async def test_device_not_found(zha_client): """Test not found response from get device API.""" await zha_client.send_json( {ID: 6, TYPE: "zha/device", ATTR_IEEE: "28:6d:97:00:01:04:11:8c"} @@ -176,7 +191,7 @@ async def test_device_not_found(hass, config_entry, zha_gateway, zha_client): assert msg["error"]["code"] == const.ERR_NOT_FOUND -async def test_list_groups(hass, config_entry, zha_gateway, zha_client): +async def test_list_groups(zha_client): """Test getting zha zigbee groups.""" await zha_client.send_json({ID: 7, TYPE: "zha/groups"}) @@ -193,7 +208,7 @@ async def test_list_groups(hass, config_entry, zha_gateway, zha_client): assert group["members"] == [] -async def test_get_group(hass, config_entry, zha_gateway, zha_client): +async def test_get_group(zha_client): """Test getting a specific zha zigbee group.""" await zha_client.send_json({ID: 8, TYPE: "zha/group", GROUP_ID: FIXTURE_GRP_ID}) @@ -208,7 +223,7 @@ async def test_get_group(hass, config_entry, zha_gateway, zha_client): assert group["members"] == [] -async def test_get_group_not_found(hass, config_entry, zha_gateway, zha_client): +async def test_get_group_not_found(zha_client): """Test not found response from get group API.""" await zha_client.send_json({ID: 9, TYPE: "zha/group", GROUP_ID: 1234567}) @@ -220,14 +235,9 @@ async def test_get_group_not_found(hass, config_entry, zha_gateway, zha_client): assert msg["error"]["code"] == const.ERR_NOT_FOUND -async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_client): +async def test_list_groupable_devices(zha_client, device_groupable): """Test getting zha devices that have a group cluster.""" - # Make device available - zha_gateway.devices[ - zigpy.types.EUI64.convert("01:2d:6f:00:0a:90:69:e8") - ].set_available(True) - await zha_client.send_json({ID: 10, TYPE: "zha/devices/groupable"}) msg = await zha_client.receive_json() @@ -251,9 +261,7 @@ async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_clien # Make sure there are no groupable devices when the device is unavailable # Make device unavailable - zha_gateway.devices[ - zigpy.types.EUI64.convert("01:2d:6f:00:0a:90:69:e8") - ].set_available(False) + device_groupable.set_available(False) await zha_client.send_json({ID: 11, TYPE: "zha/devices/groupable"}) @@ -265,7 +273,7 @@ async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_clien assert len(devices) == 0 -async def test_add_group(hass, config_entry, zha_gateway, zha_client): +async def test_add_group(zha_client): """Test adding and getting a new zha zigbee group.""" await zha_client.send_json({ID: 12, TYPE: "zha/group/add", GROUP_NAME: "new_group"}) @@ -291,7 +299,7 @@ async def test_add_group(hass, config_entry, zha_gateway, zha_client): assert group["name"] == FIXTURE_GRP_NAME or group["name"] == "new_group" -async def test_remove_group(hass, config_entry, zha_gateway, zha_client): +async def test_remove_group(zha_client): """Test removing a new zha zigbee group.""" await zha_client.send_json({ID: 14, TYPE: "zha/groups"}) diff --git a/tests/components/zha/test_binary_sensor.py b/tests/components/zha/test_binary_sensor.py index 2765a465ace..4be5f3c1f39 100644 --- a/tests/components/zha/test_binary_sensor.py +++ b/tests/components/zha/test_binary_sensor.py @@ -1,5 +1,5 @@ """Test zha binary sensor.""" -import zigpy.zcl.clusters.general as general +import pytest import zigpy.zcl.clusters.measurement as measurement import zigpy.zcl.clusters.security as security import zigpy.zcl.foundation as zcl_f @@ -9,76 +9,27 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, ) +DEVICE_IAS = { + 1: { + "device_type": 1026, + "in_clusters": [security.IasZone.cluster_id], + "out_clusters": [], + } +} -async def test_binary_sensor(hass, config_entry, zha_gateway): - """Test zha binary_sensor platform.""" - # create zigpy devices - zigpy_device_zone = await async_init_zigpy_device( - hass, - [security.IasZone.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ieee="00:0d:6f:11:9a:90:69:e6", - ) - - zigpy_device_occupancy = await async_init_zigpy_device( - hass, - [measurement.OccupancySensing.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ieee="00:0d:6f:11:9a:90:69:e7", - manufacturer="FakeOccupancy", - model="FakeOccupancyModel", - ) - - # load up binary_sensor domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - - # on off binary_sensor - zone_cluster = zigpy_device_zone.endpoints.get(1).ias_zone - zone_zha_device = zha_gateway.get_device(zigpy_device_zone.ieee) - zone_entity_id = await find_entity_id(DOMAIN, zone_zha_device, hass) - assert zone_entity_id is not None - - # occupancy binary_sensor - occupancy_cluster = zigpy_device_occupancy.endpoints.get(1).occupancy - occupancy_zha_device = zha_gateway.get_device(zigpy_device_occupancy.ieee) - occupancy_entity_id = await find_entity_id(DOMAIN, occupancy_zha_device, hass) - assert occupancy_entity_id is not None - - # test that the sensors exist and are in the unavailable state - assert hass.states.get(zone_entity_id).state == STATE_UNAVAILABLE - assert hass.states.get(occupancy_entity_id).state == STATE_UNAVAILABLE - - await async_enable_traffic( - hass, zha_gateway, [zone_zha_device, occupancy_zha_device] - ) - - # test that the sensors exist and are in the off state - assert hass.states.get(zone_entity_id).state == STATE_OFF - assert hass.states.get(occupancy_entity_id).state == STATE_OFF - - # test getting messages that trigger and reset the sensors - await async_test_binary_sensor_on_off(hass, occupancy_cluster, occupancy_entity_id) - - # test IASZone binary sensors - await async_test_iaszone_on_off(hass, zone_cluster, zone_entity_id) - - # test new sensor join - await async_test_device_join( - hass, zha_gateway, measurement.OccupancySensing.cluster_id, occupancy_entity_id - ) +DEVICE_OCCUPANCY = { + 1: { + "device_type": 263, + "in_clusters": [measurement.OccupancySensing.cluster_id], + "out_clusters": [], + } +} async def async_test_binary_sensor_on_off(hass, cluster, entity_id): @@ -109,3 +60,52 @@ async def async_test_iaszone_on_off(hass, cluster, entity_id): cluster.listener_event("cluster_command", 1, 0, [0]) await hass.async_block_till_done() assert hass.states.get(entity_id).state == STATE_OFF + + +@pytest.mark.parametrize( + "device, on_off_test, cluster_name, reporting", + [ + (DEVICE_IAS, async_test_iaszone_on_off, "ias_zone", False), + (DEVICE_OCCUPANCY, async_test_binary_sensor_on_off, "occupancy", True), + ], +) +async def test_binary_sensor( + hass, + zha_gateway, + zigpy_device_mock, + zha_device_joined_restored, + device, + on_off_test, + cluster_name, + reporting, +): + """Test ZHA binary_sensor platform.""" + zigpy_device = zigpy_device_mock(device) + zha_device = await zha_device_joined_restored(zigpy_device) + entity_id = await find_entity_id(DOMAIN, zha_device, hass) + + assert entity_id is not None + + # test that the sensors exist and are in the unavailable state + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + + await async_enable_traffic(hass, zha_gateway, [zha_device]) + + # test that the sensors exist and are in the off state + assert hass.states.get(entity_id).state == STATE_OFF + + # test getting messages that trigger and reset the sensors + cluster = getattr(zigpy_device.endpoints[1], cluster_name) + await on_off_test(hass, cluster, entity_id) + + # test rejoin + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_OFF + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + if reporting: + assert cluster.configure_reporting.call_count > 0 + assert cluster.configure_reporting.await_count > 0 diff --git a/tests/components/zha/test_cover.py b/tests/components/zha/test_cover.py index 9d1c019c718..321ad95e73e 100644 --- a/tests/components/zha/test_cover.py +++ b/tests/components/zha/test_cover.py @@ -1,9 +1,10 @@ """Test zha cover.""" from unittest.mock import MagicMock, call, patch +import asynctest +import pytest import zigpy.types import zigpy.zcl.clusters.closures as closures -import zigpy.zcl.clusters.general as general import zigpy.zcl.foundation as zcl_f from homeassistant.components.cover import DOMAIN @@ -11,8 +12,6 @@ from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, @@ -21,17 +20,27 @@ from .common import ( from tests.common import mock_coro -async def test_cover(hass, config_entry, zha_gateway): - """Test zha cover platform.""" +@pytest.fixture +def zigpy_cover_device(zigpy_device_mock): + """Zigpy cover device.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [closures.WindowCovering.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ) + endpoints = { + 1: { + "device_type": 1026, + "in_clusters": [closures.WindowCovering.cluster_id], + "out_clusters": [], + } + } + return zigpy_device_mock(endpoints) + + +@asynctest.patch( + "homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize" +) +async def test_cover( + m1, hass, zha_gateway, zha_device_joined_restored, zigpy_cover_device +): + """Test zha cover platform.""" async def get_chan_attr(*args, **kwargs): return 100 @@ -41,13 +50,11 @@ async def test_cover(hass, config_entry, zha_gateway): new=MagicMock(side_effect=get_chan_attr), ) as get_attr_mock: # load up cover domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() + zha_device = await zha_device_joined_restored(zigpy_cover_device) assert get_attr_mock.call_count == 2 assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage" - cluster = zigpy_device.endpoints.get(1).window_covering - zha_device = zha_gateway.get_device(zigpy_device.ieee) + cluster = zigpy_cover_device.endpoints.get(1).window_covering entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None @@ -124,6 +131,13 @@ async def test_cover(hass, config_entry, zha_gateway): False, 0x2, (), expect_reply=True, manufacturer=None ) - await async_test_device_join( - hass, zha_gateway, closures.WindowCovering.cluster_id, entity_id - ) + # test rejoin + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_cover_device) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_OPEN + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == 1 + assert cluster.configure_reporting.await_count == 1 diff --git a/tests/components/zha/test_device_action.py b/tests/components/zha/test_device_action.py index c3195559d20..4f41878f952 100644 --- a/tests/components/zha/test_device_action.py +++ b/tests/components/zha/test_device_action.py @@ -11,12 +11,10 @@ from homeassistant.components.device_automation import ( _async_get_device_automations as async_get_device_automations, ) from homeassistant.components.zha import DOMAIN -from homeassistant.components.zha.core.const import CHANNEL_ON_OFF +from homeassistant.components.zha.core.const import CHANNEL_EVENT_RELAY from homeassistant.helpers.device_registry import async_get_registry from homeassistant.setup import async_setup_component -from .common import async_enable_traffic, async_init_zigpy_device - from tests.common import async_mock_service, mock_coro SHORT_PRESS = "remote_button_short_press" @@ -30,28 +28,31 @@ def calls(hass): return async_mock_service(hass, "zha", "warning_device_warn") -async def test_get_actions(hass, config_entry, zha_gateway): - """Test we get the expected actions from a zha device.""" +@pytest.fixture +async def device_ias(hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored): + """IAS device fixture.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [ - general.Basic.cluster_id, - security.IasZone.cluster_id, - security.IasWd.cluster_id, - ], - [], - None, - zha_gateway, + clusters = [general.Basic, security.IasZone, security.IasWd] + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [c.cluster_id for c in clusters], + "out_clusters": [general.OnOff.cluster_id], + "device_type": 0, + } + }, ) - await hass.config_entries.async_forward_entry_setup(config_entry, "binary_sensor") + zha_device = await zha_device_joined_restored(zigpy_device) + zha_device.update_available(True) await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) + return zigpy_device, zha_device - zha_device = zha_gateway.get_device(zigpy_device.ieee) - ieee_address = str(zha_device.ieee) + +async def test_get_actions(hass, device_ias): + """Test we get the expected actions from a zha device.""" + + ieee_address = str(device_ias[0].ieee) ha_device_registry = await async_get_registry(hass) reg_device = ha_device_registry.async_get_device({(DOMAIN, ieee_address)}, set()) @@ -66,40 +67,19 @@ async def test_get_actions(hass, config_entry, zha_gateway): assert actions == expected_actions -async def test_action(hass, config_entry, zha_gateway, calls): +async def test_action(hass, calls, device_ias): """Test for executing a zha device action.""" - - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [ - general.Basic.cluster_id, - security.IasZone.cluster_id, - security.IasWd.cluster_id, - ], - [general.OnOff.cluster_id], - None, - zha_gateway, - ) + zigpy_device, zha_device = device_ias zigpy_device.device_automation_triggers = { (SHORT_PRESS, SHORT_PRESS): {COMMAND: COMMAND_SINGLE} } - await hass.config_entries.async_forward_entry_setup(config_entry, "switch") - await hass.async_block_till_done() - - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) reg_device = ha_device_registry.async_get_device({(DOMAIN, ieee_address)}, set()) - # allow traffic to flow through the gateway and device - await async_enable_traffic(hass, zha_gateway, [zha_device]) - with patch( "zigpy.zcl.Cluster.request", return_value=mock_coro([0x00, zcl_f.Status.SUCCESS]), @@ -129,8 +109,8 @@ async def test_action(hass, config_entry, zha_gateway, calls): await hass.async_block_till_done() - on_off_channel = zha_device.cluster_channels[CHANNEL_ON_OFF] - on_off_channel.zha_send_event(on_off_channel.cluster, COMMAND_SINGLE, []) + channel = {ch.name: ch for ch in zha_device.all_channels}[CHANNEL_EVENT_RELAY] + channel.zha_send_event(channel.cluster, COMMAND_SINGLE, []) await hass.async_block_till_done() assert len(calls) == 1 diff --git a/tests/components/zha/test_device_tracker.py b/tests/components/zha/test_device_tracker.py index bac338ae5e0..fe5661c4776 100644 --- a/tests/components/zha/test_device_tracker.py +++ b/tests/components/zha/test_device_tracker.py @@ -2,6 +2,7 @@ from datetime import timedelta import time +import pytest import zigpy.zcl.clusters.general as general import zigpy.zcl.foundation as zcl_f @@ -14,8 +15,6 @@ import homeassistant.util.dt as dt_util from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, @@ -24,37 +23,39 @@ from .common import ( from tests.common import async_fire_time_changed -async def test_device_tracker(hass, config_entry, zha_gateway): +@pytest.fixture +def zigpy_device_dt(zigpy_device_mock): + """Device tracker zigpy device.""" + endpoints = { + 1: { + "in_clusters": [ + general.Basic.cluster_id, + general.PowerConfiguration.cluster_id, + general.Identify.cluster_id, + general.PollControl.cluster_id, + general.BinaryInput.cluster_id, + ], + "out_clusters": [general.Identify.cluster_id, general.Ota.cluster_id], + "device_type": SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE, + } + } + return zigpy_device_mock(endpoints) + + +async def test_device_tracker( + hass, zha_gateway, zha_device_joined_restored, zigpy_device_dt +): """Test zha device tracker platform.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [ - general.Basic.cluster_id, - general.PowerConfiguration.cluster_id, - general.Identify.cluster_id, - general.PollControl.cluster_id, - general.BinaryInput.cluster_id, - ], - [general.Identify.cluster_id, general.Ota.cluster_id], - SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE, - zha_gateway, - ) - - # load up device tracker domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - - cluster = zigpy_device.endpoints.get(1).power - zha_device = zha_gateway.get_device(zigpy_device.ieee) + zha_device = await zha_device_joined_restored(zigpy_device_dt) + cluster = zigpy_device_dt.endpoints.get(1).power entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None # test that the device tracker was created and that it is unavailable assert hass.states.get(entity_id).state == STATE_UNAVAILABLE - zigpy_device.last_seen = time.time() - 120 + zigpy_device_dt.last_seen = time.time() - 120 next_update = dt_util.utcnow() + timedelta(seconds=30) async_fire_time_changed(hass, next_update) await hass.async_block_till_done() @@ -73,7 +74,7 @@ async def test_device_tracker(hass, config_entry, zha_gateway): attr = make_attribute(0x0021, 200) cluster.handle_message(hdr, [[attr]]) - zigpy_device.last_seen = time.time() + 10 + zigpy_device_dt.last_seen = time.time() + 10 next_update = dt_util.utcnow() + timedelta(seconds=30) async_fire_time_changed(hass, next_update) await hass.async_block_till_done() @@ -87,10 +88,12 @@ async def test_device_tracker(hass, config_entry, zha_gateway): assert entity.battery_level == 100 # test adding device tracker to the network and HA - await async_test_device_join( - hass, - zha_gateway, - general.PowerConfiguration.cluster_id, - entity_id, - SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE, - ) + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device_dt) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_HOME + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == 2 + assert cluster.configure_reporting.await_count == 2 diff --git a/tests/components/zha/test_device_trigger.py b/tests/components/zha/test_device_trigger.py index 973b6673671..56f5c6c85ba 100644 --- a/tests/components/zha/test_device_trigger.py +++ b/tests/components/zha/test_device_trigger.py @@ -3,13 +3,10 @@ import pytest import zigpy.zcl.clusters.general as general import homeassistant.components.automation as automation -from homeassistant.components.switch import DOMAIN -from homeassistant.components.zha.core.const import CHANNEL_ON_OFF +from homeassistant.components.zha.core.const import CHANNEL_EVENT_RELAY from homeassistant.helpers.device_registry import async_get_registry from homeassistant.setup import async_setup_component -from .common import async_enable_traffic, async_init_zigpy_device - from tests.common import async_get_device_automations, async_mock_service ON = 1 @@ -42,13 +39,31 @@ def calls(hass): return async_mock_service(hass, "test", "automation") -async def test_triggers(hass, config_entry, zha_gateway): +@pytest.fixture(params=["zha_device_joined", "zha_device_restored"]) +async def mock_devices(hass, zha_gateway, zigpy_device_mock, request): + """IAS device fixture.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [general.Basic.cluster_id], + "out_clusters": [general.OnOff.cluster_id], + "device_type": 0, + } + }, + ) + + join_or_restore = request.getfixturevalue(request.param) + zha_device = await join_or_restore(zigpy_device) + zha_device.update_available(True) + await hass.async_block_till_done() + return zigpy_device, zha_device + + +async def test_triggers(hass, mock_devices): """Test zha device triggers.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway - ) + zigpy_device, zha_device = mock_devices zigpy_device.device_automation_triggers = { (SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE}, @@ -58,11 +73,6 @@ async def test_triggers(hass, config_entry, zha_gateway): (LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD}, } - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) @@ -110,19 +120,10 @@ async def test_triggers(hass, config_entry, zha_gateway): assert _same_lists(triggers, expected_triggers) -async def test_no_triggers(hass, config_entry, zha_gateway): +async def test_no_triggers(hass, mock_devices): """Test zha device with no triggers.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway - ) - - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) + _, zha_device = mock_devices ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) @@ -132,13 +133,10 @@ async def test_no_triggers(hass, config_entry, zha_gateway): assert triggers == [] -async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls): +async def test_if_fires_on_event(hass, mock_devices, calls): """Test for remote triggers firing.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway - ) + zigpy_device, zha_device = mock_devices zigpy_device.device_automation_triggers = { (SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE}, @@ -148,15 +146,6 @@ async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls): (LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD}, } - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) - - # allow traffic to flow through the gateway and device - await async_enable_traffic(hass, zha_gateway, [zha_device]) - ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) reg_device = ha_device_registry.async_get_device({("zha", ieee_address)}, set()) @@ -185,30 +174,18 @@ async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls): await hass.async_block_till_done() - on_off_channel = zha_device.cluster_channels[CHANNEL_ON_OFF] - on_off_channel.zha_send_event(on_off_channel.cluster, COMMAND_SINGLE, []) + channel = {ch.name: ch for ch in zha_device.all_channels}[CHANNEL_EVENT_RELAY] + channel.zha_send_event(channel.cluster, COMMAND_SINGLE, []) await hass.async_block_till_done() assert len(calls) == 1 assert calls[0].data["message"] == "service called" -async def test_exception_no_triggers(hass, config_entry, zha_gateway, calls, caplog): +async def test_exception_no_triggers(hass, mock_devices, calls, caplog): """Test for exception on event triggers firing.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway - ) - - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) - - # allow traffic to flow through the gateway and device - await async_enable_traffic(hass, zha_gateway, [zha_device]) + _, zha_device = mock_devices ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) @@ -239,13 +216,10 @@ async def test_exception_no_triggers(hass, config_entry, zha_gateway, calls, cap assert "Invalid config for [automation]" in caplog.text -async def test_exception_bad_trigger(hass, config_entry, zha_gateway, calls, caplog): +async def test_exception_bad_trigger(hass, mock_devices, calls, caplog): """Test for exception on event triggers firing.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway - ) + zigpy_device, zha_device = mock_devices zigpy_device.device_automation_triggers = { (SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE}, @@ -255,15 +229,6 @@ async def test_exception_bad_trigger(hass, config_entry, zha_gateway, calls, cap (LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD}, } - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - hass.config_entries._entries.append(config_entry) - - zha_device = zha_gateway.get_device(zigpy_device.ieee) - - # allow traffic to flow through the gateway and device - await async_enable_traffic(hass, zha_gateway, [zha_device]) - ieee_address = str(zha_device.ieee) ha_device_registry = await async_get_registry(hass) reg_device = ha_device_registry.async_get_device({("zha", ieee_address)}, set()) diff --git a/tests/components/zha/test_fan.py b/tests/components/zha/test_fan.py index 660bff2abac..48222993a3a 100644 --- a/tests/components/zha/test_fan.py +++ b/tests/components/zha/test_fan.py @@ -1,7 +1,7 @@ """Test zha fan.""" from unittest.mock import call, patch -import zigpy.zcl.clusters.general as general +import pytest import zigpy.zcl.clusters.hvac as hvac import zigpy.zcl.foundation as zcl_f @@ -18,8 +18,6 @@ from homeassistant.const import ( from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, @@ -28,20 +26,20 @@ from .common import ( from tests.common import mock_coro -async def test_fan(hass, config_entry, zha_gateway): +@pytest.fixture +def zigpy_device(zigpy_device_mock): + """Device tracker zigpy device.""" + endpoints = { + 1: {"in_clusters": [hvac.Fan.cluster_id], "out_clusters": [], "device_type": 0} + } + return zigpy_device_mock(endpoints) + + +async def test_fan(hass, zha_gateway, zha_device_joined_restored, zigpy_device): """Test zha fan platform.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [hvac.Fan.cluster_id, general.Basic.cluster_id], [], None, zha_gateway - ) - - # load up fan domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - + zha_device = await zha_device_joined_restored(zigpy_device) cluster = zigpy_device.endpoints.get(1).fan - zha_device = zha_gateway.get_device(zigpy_device.ieee) entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None @@ -98,7 +96,14 @@ async def test_fan(hass, config_entry, zha_gateway): assert cluster.write_attributes.call_args == call({"fan_mode": 3}) # test adding new fan to the network and HA - await async_test_device_join(hass, zha_gateway, hvac.Fan.cluster_id, entity_id) + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device) + await hass.async_block_till_done() + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == 1 + assert cluster.configure_reporting.await_count == 1 async def async_turn_on(hass, entity_id, speed=None): diff --git a/tests/components/zha/test_gateway.py b/tests/components/zha/test_gateway.py index 5c6e8ecfe7a..45f47a1fa87 100644 --- a/tests/components/zha/test_gateway.py +++ b/tests/components/zha/test_gateway.py @@ -1,29 +1,39 @@ """Test ZHA Gateway.""" +import pytest import zigpy.zcl.clusters.general as general -import homeassistant.components.zha.core.const as zha_const - -from .common import async_enable_traffic, async_init_zigpy_device +from .common import async_enable_traffic -async def test_device_left(hass, config_entry, zha_gateway): - """Test zha fan platform.""" - - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, [general.Basic.cluster_id], [], None, zha_gateway +@pytest.fixture +def zigpy_dev_basic(zigpy_device_mock): + """Zigpy device with just a basic cluster.""" + return zigpy_device_mock( + { + 1: { + "in_clusters": [general.Basic.cluster_id], + "out_clusters": [], + "device_type": 0, + } + }, ) - # load up fan domain - await hass.config_entries.async_forward_entry_setup(config_entry, zha_const.SENSOR) - await hass.async_block_till_done() - zha_device = zha_gateway.get_device(zigpy_device.ieee) +@pytest.fixture +async def zha_dev_basic(hass, zha_device_restored, zigpy_dev_basic): + """ZHA device with just a basic cluster.""" - assert zha_device.available is False + zha_device = await zha_device_restored(zigpy_dev_basic) + return zha_device - await async_enable_traffic(hass, zha_gateway, [zha_device]) - assert zha_device.available is True - zha_gateway.device_left(zigpy_device) - assert zha_device.available is False +async def test_device_left(hass, zha_gateway, zigpy_dev_basic, zha_dev_basic): + """Device leaving the network should become unavailable.""" + + assert zha_dev_basic.available is False + + await async_enable_traffic(hass, zha_gateway, [zha_dev_basic]) + assert zha_dev_basic.available is True + + zha_gateway.device_left(zigpy_dev_basic) + assert zha_dev_basic.available is False diff --git a/tests/components/zha/test_light.py b/tests/components/zha/test_light.py index 53be188ae80..5df379d1d7a 100644 --- a/tests/components/zha/test_light.py +++ b/tests/components/zha/test_light.py @@ -1,10 +1,12 @@ """Test zha light.""" -import asyncio -from unittest.mock import MagicMock, call, patch, sentinel +from unittest.mock import call, sentinel +import asynctest +import pytest import zigpy.profiles.zha import zigpy.types import zigpy.zcl.clusters.general as general +import zigpy.zcl.clusters.lighting as lighting import zigpy.zcl.foundation as zcl_f from homeassistant.components.light import DOMAIN @@ -12,121 +14,124 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, ) -from tests.common import mock_coro - ON = 1 OFF = 0 +LIGHT_ON_OFF = { + 1: { + "device_type": zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT, + "in_clusters": [general.Basic.cluster_id, general.OnOff.cluster_id], + "out_clusters": [general.Ota.cluster_id], + } +} -async def test_light(hass, config_entry, zha_gateway, monkeypatch): +LIGHT_LEVEL = { + 1: { + "device_type": zigpy.profiles.zha.DeviceType.DIMMABLE_LIGHT, + "in_clusters": [ + general.Basic.cluster_id, + general.LevelControl.cluster_id, + general.OnOff.cluster_id, + ], + "out_clusters": [general.Ota.cluster_id], + } +} + +LIGHT_COLOR = { + 1: { + "device_type": zigpy.profiles.zha.DeviceType.COLOR_DIMMABLE_LIGHT, + "in_clusters": [ + general.Basic.cluster_id, + general.LevelControl.cluster_id, + general.OnOff.cluster_id, + lighting.Color.cluster_id, + ], + "out_clusters": [general.Ota.cluster_id], + } +} + + +@asynctest.patch( + "zigpy.zcl.clusters.lighting.Color.request", + new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]), +) +@asynctest.patch( + "zigpy.zcl.clusters.general.LevelControl.request", + new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]), +) +@asynctest.patch( + "zigpy.zcl.clusters.general.OnOff.request", + new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]), +) +@pytest.mark.parametrize( + "device, reporting", + [(LIGHT_ON_OFF, (1, 0, 0)), (LIGHT_LEVEL, (1, 1, 0)), (LIGHT_COLOR, (1, 1, 3))], +) +async def test_light( + hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored, device, reporting, +): """Test zha light platform.""" # create zigpy devices - zigpy_device_on_off = await async_init_zigpy_device( - hass, - [general.OnOff.cluster_id, general.Basic.cluster_id], - [], - zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT, - zha_gateway, - ieee="00:0d:6f:11:0a:90:69:e6", - ) + zigpy_device = zigpy_device_mock(device) + zha_device = await zha_device_joined_restored(zigpy_device) + entity_id = await find_entity_id(DOMAIN, zha_device, hass) - zigpy_device_level = await async_init_zigpy_device( - hass, - [ - general.OnOff.cluster_id, - general.LevelControl.cluster_id, - general.Basic.cluster_id, - ], - [], - zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT, - zha_gateway, - ieee="00:0d:6f:11:0a:90:69:e7", - manufacturer="FakeLevelManufacturer", - model="FakeLevelModel", - ) + assert entity_id is not None - # load up light domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - - # on off light - on_off_device_on_off_cluster = zigpy_device_on_off.endpoints.get(1).on_off - on_off_zha_device = zha_gateway.get_device(zigpy_device_on_off.ieee) - on_off_entity_id = await find_entity_id(DOMAIN, on_off_zha_device, hass) - assert on_off_entity_id is not None - - # dimmable light - level_device_on_off_cluster = zigpy_device_level.endpoints.get(1).on_off - level_device_level_cluster = zigpy_device_level.endpoints.get(1).level - on_off_mock = MagicMock( - side_effect=asyncio.coroutine( - MagicMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]) - ) - ) - level_mock = MagicMock( - side_effect=asyncio.coroutine( - MagicMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]) - ) - ) - monkeypatch.setattr(level_device_on_off_cluster, "request", on_off_mock) - monkeypatch.setattr(level_device_level_cluster, "request", level_mock) - level_zha_device = zha_gateway.get_device(zigpy_device_level.ieee) - level_entity_id = await find_entity_id(DOMAIN, level_zha_device, hass) - assert level_entity_id is not None + cluster_on_off = zigpy_device.endpoints[1].on_off + cluster_level = getattr(zigpy_device.endpoints[1], "level", None) + cluster_color = getattr(zigpy_device.endpoints[1], "light_color", None) # test that the lights were created and that they are unavailable - assert hass.states.get(on_off_entity_id).state == STATE_UNAVAILABLE - assert hass.states.get(level_entity_id).state == STATE_UNAVAILABLE + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE # allow traffic to flow through the gateway and device - await async_enable_traffic(hass, zha_gateway, [on_off_zha_device, level_zha_device]) + await async_enable_traffic(hass, zha_gateway, [zha_device]) # test that the lights were created and are off - assert hass.states.get(on_off_entity_id).state == STATE_OFF - assert hass.states.get(level_entity_id).state == STATE_OFF + assert hass.states.get(entity_id).state == STATE_OFF # test turning the lights on and off from the light - await async_test_on_off_from_light( - hass, on_off_device_on_off_cluster, on_off_entity_id - ) - - await async_test_on_off_from_light( - hass, level_device_on_off_cluster, level_entity_id - ) + await async_test_on_off_from_light(hass, cluster_on_off, entity_id) # test turning the lights on and off from the HA - await async_test_on_off_from_hass( - hass, on_off_device_on_off_cluster, on_off_entity_id - ) + await async_test_on_off_from_hass(hass, cluster_on_off, entity_id) - await async_test_level_on_off_from_hass( - hass, level_device_on_off_cluster, level_device_level_cluster, level_entity_id - ) + if cluster_level: + await async_test_level_on_off_from_hass( + hass, cluster_on_off, cluster_level, entity_id + ) - # test turning the lights on and off from the light - await async_test_on_from_light(hass, level_device_on_off_cluster, level_entity_id) + # test getting a brightness change from the network + await async_test_on_from_light(hass, cluster_on_off, entity_id) + await async_test_dimmer_from_light( + hass, cluster_level, entity_id, 150, STATE_ON + ) - # test getting a brightness change from the network - await async_test_dimmer_from_light( - hass, level_device_level_cluster, level_entity_id, 150, STATE_ON - ) - - # test adding a new light to the network and HA - await async_test_device_join( - hass, - zha_gateway, - general.OnOff.cluster_id, - on_off_entity_id, - device_type=zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT, - ) + # test rejoin + await async_test_off_from_hass(hass, cluster_on_off, entity_id) + clusters = [cluster_on_off] + if cluster_level: + clusters.append(cluster_level) + if cluster_color: + clusters.append(cluster_color) + for cluster in clusters: + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_OFF + for cluster, reporting_count in zip(clusters, reporting): + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == reporting_count + assert cluster.configure_reporting.await_count == reporting_count async def async_test_on_off_from_light(hass, cluster, entity_id): @@ -157,36 +162,33 @@ async def async_test_on_from_light(hass, cluster, entity_id): async def async_test_on_off_from_hass(hass, cluster, entity_id): """Test on off functionality from hass.""" - with patch( - "zigpy.zcl.Cluster.request", - return_value=mock_coro([0x00, zcl_f.Status.SUCCESS]), - ): - # turn on via UI - await hass.services.async_call( - DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True - ) - assert cluster.request.call_count == 1 - assert cluster.request.call_args == call( - False, ON, (), expect_reply=True, manufacturer=None - ) + # turn on via UI + cluster.request.reset_mock() + await hass.services.async_call( + DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True + ) + assert cluster.request.call_count == 1 + assert cluster.request.await_count == 1 + assert cluster.request.call_args == call( + False, ON, (), expect_reply=True, manufacturer=None + ) await async_test_off_from_hass(hass, cluster, entity_id) async def async_test_off_from_hass(hass, cluster, entity_id): """Test turning off the light from Home Assistant.""" - with patch( - "zigpy.zcl.Cluster.request", - return_value=mock_coro([0x01, zcl_f.Status.SUCCESS]), - ): - # turn off via UI - await hass.services.async_call( - DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True - ) - assert cluster.request.call_count == 1 - assert cluster.request.call_args == call( - False, OFF, (), expect_reply=True, manufacturer=None - ) + + # turn off via UI + cluster.request.reset_mock() + await hass.services.async_call( + DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert cluster.request.call_count == 1 + assert cluster.request.await_count == 1 + assert cluster.request.call_args == call( + False, OFF, (), expect_reply=True, manufacturer=None + ) async def async_test_level_on_off_from_hass( @@ -194,12 +196,15 @@ async def async_test_level_on_off_from_hass( ): """Test on off functionality from hass.""" + on_off_cluster.request.reset_mock() # turn on via UI await hass.services.async_call( DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True ) assert on_off_cluster.request.call_count == 1 + assert on_off_cluster.request.await_count == 1 assert level_cluster.request.call_count == 0 + assert level_cluster.request.await_count == 0 assert on_off_cluster.request.call_args == call( False, 1, (), expect_reply=True, manufacturer=None ) @@ -210,7 +215,9 @@ async def async_test_level_on_off_from_hass( DOMAIN, "turn_on", {"entity_id": entity_id, "transition": 10}, blocking=True ) assert on_off_cluster.request.call_count == 1 + assert on_off_cluster.request.await_count == 1 assert level_cluster.request.call_count == 1 + assert level_cluster.request.await_count == 1 assert on_off_cluster.request.call_args == call( False, 1, (), expect_reply=True, manufacturer=None ) @@ -230,7 +237,9 @@ async def async_test_level_on_off_from_hass( DOMAIN, "turn_on", {"entity_id": entity_id, "brightness": 10}, blocking=True ) assert on_off_cluster.request.call_count == 1 + assert on_off_cluster.request.await_count == 1 assert level_cluster.request.call_count == 1 + assert level_cluster.request.await_count == 1 assert on_off_cluster.request.call_args == call( False, 1, (), expect_reply=True, manufacturer=None ) diff --git a/tests/components/zha/test_lock.py b/tests/components/zha/test_lock.py index 1daef317fed..2c5dc9f41ba 100644 --- a/tests/components/zha/test_lock.py +++ b/tests/components/zha/test_lock.py @@ -1,6 +1,8 @@ """Test zha lock.""" from unittest.mock import patch +import pytest +import zigpy.profiles.zha import zigpy.zcl.clusters.closures as closures import zigpy.zcl.clusters.general as general import zigpy.zcl.foundation as zcl_f @@ -10,7 +12,6 @@ from homeassistant.const import STATE_LOCKED, STATE_UNAVAILABLE, STATE_UNLOCKED from .common import ( async_enable_traffic, - async_init_zigpy_device, find_entity_id, make_attribute, make_zcl_header, @@ -22,24 +23,29 @@ LOCK_DOOR = 0 UNLOCK_DOOR = 1 -async def test_lock(hass, config_entry, zha_gateway): - """Test zha lock platform.""" +@pytest.fixture(params=["zha_device_joined", "zha_device_restored"]) +async def lock(hass, zha_gateway, zigpy_device_mock, request): + """Lock cluster fixture.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [closures.DoorLock.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [closures.DoorLock.cluster_id, general.Basic.cluster_id], + "out_clusters": [], + "device_type": zigpy.profiles.zha.DeviceType.DOOR_LOCK, + } + }, ) - # load up lock domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() + join_or_restore = request.getfixturevalue(request.param) + zha_device = await join_or_restore(zigpy_device) + return zha_device, zigpy_device.endpoints[1].door_lock - cluster = zigpy_device.endpoints.get(1).door_lock - zha_device = zha_gateway.get_device(zigpy_device.ieee) + +async def test_lock(hass, zha_gateway, lock): + """Test zha lock platform.""" + + zha_device, cluster = lock entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None diff --git a/tests/components/zha/test_sensor.py b/tests/components/zha/test_sensor.py index b4fe0883bf3..b78ea8a3583 100644 --- a/tests/components/zha/test_sensor.py +++ b/tests/components/zha/test_sensor.py @@ -11,7 +11,6 @@ import zigpy.zcl.foundation as zcl_f from homeassistant.components.sensor import DOMAIN import homeassistant.config as config_util from homeassistant.const import ( - ATTR_ENTITY_ID, ATTR_UNIT_OF_MEASUREMENT, CONF_UNIT_SYSTEM, CONF_UNIT_SYSTEM_IMPERIAL, @@ -26,160 +25,43 @@ from homeassistant.util import dt as dt_util from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, ) -async def test_sensor(hass, config_entry, zha_gateway): - """Test zha sensor platform.""" - - # list of cluster ids to create devices and sensor entities for - cluster_ids = [ - measurement.RelativeHumidity.cluster_id, - measurement.TemperatureMeasurement.cluster_id, - measurement.PressureMeasurement.cluster_id, - measurement.IlluminanceMeasurement.cluster_id, - smartenergy.Metering.cluster_id, - homeautomation.ElectricalMeasurement.cluster_id, - ] - - # devices that were created from cluster_ids list above - zigpy_device_infos = await async_build_devices( - hass, zha_gateway, config_entry, cluster_ids - ) - - # ensure the sensor entity was created for each id in cluster_ids - for cluster_id in cluster_ids: - zigpy_device_info = zigpy_device_infos[cluster_id] - entity_id = zigpy_device_info[ATTR_ENTITY_ID] - assert hass.states.get(entity_id).state == STATE_UNAVAILABLE - - # allow traffic to flow through the gateway and devices - await async_enable_traffic( - hass, - zha_gateway, - [ - zigpy_device_info["zha_device"] - for zigpy_device_info in zigpy_device_infos.values() - ], - ) - - # test that the sensors now have a state of unknown - for cluster_id in cluster_ids: - zigpy_device_info = zigpy_device_infos[cluster_id] - entity_id = zigpy_device_info[ATTR_ENTITY_ID] - assert hass.states.get(entity_id).state == STATE_UNKNOWN - - # get the humidity device info and test the associated sensor logic - device_info = zigpy_device_infos[measurement.RelativeHumidity.cluster_id] - await async_test_humidity(hass, device_info) - - # get the temperature device info and test the associated sensor logic - device_info = zigpy_device_infos[measurement.TemperatureMeasurement.cluster_id] - await async_test_temperature(hass, device_info) - - # get the pressure device info and test the associated sensor logic - device_info = zigpy_device_infos[measurement.PressureMeasurement.cluster_id] - await async_test_pressure(hass, device_info) - - # get the illuminance device info and test the associated sensor logic - device_info = zigpy_device_infos[measurement.IlluminanceMeasurement.cluster_id] - await async_test_illuminance(hass, device_info) - - # get the metering device info and test the associated sensor logic - device_info = zigpy_device_infos[smartenergy.Metering.cluster_id] - await async_test_metering(hass, device_info) - - # get the electrical_measurement device info and test the associated - # sensor logic - device_info = zigpy_device_infos[homeautomation.ElectricalMeasurement.cluster_id] - await async_test_electrical_measurement(hass, device_info) - - # test joining a new temperature sensor to the network - await async_test_device_join( - hass, zha_gateway, measurement.TemperatureMeasurement.cluster_id, entity_id - ) - - -async def async_build_devices(hass, zha_gateway, config_entry, cluster_ids): - """Build a zigpy device for each cluster id. - - This will build devices for all cluster ids that exist in cluster_ids. - They get added to the network and then the sensor component is loaded - which will cause sensor entities to get created for each device. - A dict containing relevant device info for testing is returned. It contains - the entity id, zigpy device, and the zigbee cluster for the sensor. - """ - - device_infos = {} - counter = 0 - for cluster_id in cluster_ids: - # create zigpy device - device_infos[cluster_id] = {"zigpy_device": None} - device_infos[cluster_id]["zigpy_device"] = await async_init_zigpy_device( - hass, - [cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ieee=f"00:15:8d:00:02:32:4f:0{counter}", - manufacturer=f"Fake{cluster_id}", - model=f"FakeModel{cluster_id}", - ) - - counter += 1 - - # load up sensor domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - - # put the other relevant info in the device info dict - for cluster_id in cluster_ids: - device_info = device_infos[cluster_id] - zigpy_device = device_info["zigpy_device"] - device_info["cluster"] = zigpy_device.endpoints.get(1).in_clusters[cluster_id] - zha_device = zha_gateway.get_device(zigpy_device.ieee) - device_info["zha_device"] = zha_device - device_info[ATTR_ENTITY_ID] = await find_entity_id(DOMAIN, zha_device, hass) - await hass.async_block_till_done() - return device_infos - - -async def async_test_humidity(hass, device_info): +async def async_test_humidity(hass, cluster, entity_id): """Test humidity sensor.""" - await send_attribute_report(hass, device_info["cluster"], 0, 1000) - assert_state(hass, device_info, "10.0", "%") + await send_attribute_report(hass, cluster, 0, 1000) + assert_state(hass, entity_id, "10.0", "%") -async def async_test_temperature(hass, device_info): +async def async_test_temperature(hass, cluster, entity_id): """Test temperature sensor.""" - await send_attribute_report(hass, device_info["cluster"], 0, 2900) - assert_state(hass, device_info, "29.0", "°C") + await send_attribute_report(hass, cluster, 0, 2900) + assert_state(hass, entity_id, "29.0", "°C") -async def async_test_pressure(hass, device_info): +async def async_test_pressure(hass, cluster, entity_id): """Test pressure sensor.""" - await send_attribute_report(hass, device_info["cluster"], 0, 1000) - assert_state(hass, device_info, "1000", "hPa") + await send_attribute_report(hass, cluster, 0, 1000) + assert_state(hass, entity_id, "1000", "hPa") -async def async_test_illuminance(hass, device_info): +async def async_test_illuminance(hass, cluster, entity_id): """Test illuminance sensor.""" - await send_attribute_report(hass, device_info["cluster"], 0, 10) - assert_state(hass, device_info, "1.0", "lx") + await send_attribute_report(hass, cluster, 0, 10) + assert_state(hass, entity_id, "1.0", "lx") -async def async_test_metering(hass, device_info): +async def async_test_metering(hass, cluster, entity_id): """Test metering sensor.""" - await send_attribute_report(hass, device_info["cluster"], 1024, 12345) - assert_state(hass, device_info, "12345.0", "unknown") + await send_attribute_report(hass, cluster, 1024, 12345) + assert_state(hass, entity_id, "12345.0", "unknown") -async def async_test_electrical_measurement(hass, device_info): +async def async_test_electrical_measurement(hass, cluster, entity_id): """Test electrical measurement sensor.""" with mock.patch( ( @@ -189,18 +71,81 @@ async def async_test_electrical_measurement(hass, device_info): new_callable=mock.PropertyMock, ) as divisor_mock: divisor_mock.return_value = 1 - await send_attribute_report(hass, device_info["cluster"], 1291, 100) - assert_state(hass, device_info, "100", "W") + await send_attribute_report(hass, cluster, 1291, 100) + assert_state(hass, entity_id, "100", "W") - await send_attribute_report(hass, device_info["cluster"], 1291, 99) - assert_state(hass, device_info, "99", "W") + await send_attribute_report(hass, cluster, 1291, 99) + assert_state(hass, entity_id, "99", "W") divisor_mock.return_value = 10 - await send_attribute_report(hass, device_info["cluster"], 1291, 1000) - assert_state(hass, device_info, "100", "W") + await send_attribute_report(hass, cluster, 1291, 1000) + assert_state(hass, entity_id, "100", "W") - await send_attribute_report(hass, device_info["cluster"], 1291, 99) - assert_state(hass, device_info, "9.9", "W") + await send_attribute_report(hass, cluster, 1291, 99) + assert_state(hass, entity_id, "9.9", "W") + + +@pytest.mark.parametrize( + "cluster_id, test_func, report_count", + ( + (measurement.RelativeHumidity.cluster_id, async_test_humidity, 1), + (measurement.TemperatureMeasurement.cluster_id, async_test_temperature, 1), + (measurement.PressureMeasurement.cluster_id, async_test_pressure, 1), + (measurement.IlluminanceMeasurement.cluster_id, async_test_illuminance, 1), + (smartenergy.Metering.cluster_id, async_test_metering, 1), + ( + homeautomation.ElectricalMeasurement.cluster_id, + async_test_electrical_measurement, + 1, + ), + ), +) +async def test_sensor( + hass, + zha_gateway, + zigpy_device_mock, + zha_device_joined_restored, + cluster_id, + test_func, + report_count, +): + """Test zha sensor platform.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [cluster_id, general.Basic.cluster_id], + "out_cluster": [], + "device_type": 0x0000, + } + } + ) + cluster = zigpy_device.endpoints[1].in_clusters[cluster_id] + zha_device = await zha_device_joined_restored(zigpy_device) + entity_id = await find_entity_id(DOMAIN, zha_device, hass) + + # ensure the sensor entity was created + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + + # allow traffic to flow through the gateway and devices + await async_enable_traffic(hass, zha_gateway, [zha_device]) + + # test that the sensor now have a state of unknown + assert hass.states.get(entity_id).state == STATE_UNKNOWN + + # test sensor associated logic + await test_func(hass, cluster, entity_id) + + # test rejoin + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device) + await hass.async_block_till_done() + await test_func(hass, cluster, entity_id) + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == report_count + assert cluster.configure_reporting.await_count == report_count async def send_attribute_report(hass, cluster, attrid, value): @@ -215,13 +160,13 @@ async def send_attribute_report(hass, cluster, attrid, value): await hass.async_block_till_done() -def assert_state(hass, device_info, state, unit_of_measurement): +def assert_state(hass, entity_id, state, unit_of_measurement): """Check that the state is what is expected. This is used to ensure that the logic in each sensor class handled the attribute report it received correctly. """ - hass_state = hass.states.get(device_info[ATTR_ENTITY_ID]) + hass_state = hass.states.get(entity_id) assert hass_state.state == state assert hass_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == unit_of_measurement @@ -282,7 +227,15 @@ def core_rs(hass_storage): ], ) async def test_temp_uom( - uom, raw_temp, expected, restore, hass_ms, config_entry, zha_gateway, core_rs + uom, + raw_temp, + expected, + restore, + hass_ms, + zha_gateway, + core_rs, + zigpy_device_mock, + zha_device_restored, ): """Test zha temperature sensor unit of measurement.""" @@ -294,17 +247,22 @@ async def test_temp_uom( CONF_UNIT_SYSTEM_METRIC if uom == TEMP_CELSIUS else CONF_UNIT_SYSTEM_IMPERIAL ) - # list of cluster ids to create devices and sensor entities for - temp_cluster = measurement.TemperatureMeasurement - cluster_ids = [temp_cluster.cluster_id] - - # devices that were created from cluster_ids list above - zigpy_device_infos = await async_build_devices( - hass, zha_gateway, config_entry, cluster_ids + zigpy_device = zigpy_device_mock( + { + 1: { + "in_clusters": [ + measurement.TemperatureMeasurement.cluster_id, + general.Basic.cluster_id, + ], + "out_cluster": [], + "device_type": 0x0000, + } + } ) + cluster = zigpy_device.endpoints[1].temperature + zha_device = await zha_device_restored(zigpy_device) + entity_id = await find_entity_id(DOMAIN, zha_device, hass) - zigpy_device_info = zigpy_device_infos[temp_cluster.cluster_id] - zha_device = zigpy_device_info["zha_device"] if not restore: assert hass.states.get(entity_id).state == STATE_UNAVAILABLE @@ -315,7 +273,7 @@ async def test_temp_uom( if not restore: assert hass.states.get(entity_id).state == STATE_UNKNOWN - await send_attribute_report(hass, zigpy_device_info["cluster"], 0, raw_temp) + await send_attribute_report(hass, cluster, 0, raw_temp) await hass.async_block_till_done() state = hass.states.get(entity_id) assert state is not None diff --git a/tests/components/zha/test_switch.py b/tests/components/zha/test_switch.py index 11a0b8f3481..f70538f65e8 100644 --- a/tests/components/zha/test_switch.py +++ b/tests/components/zha/test_switch.py @@ -1,6 +1,7 @@ """Test zha switch.""" from unittest.mock import call, patch +import pytest import zigpy.zcl.clusters.general as general import zigpy.zcl.foundation as zcl_f @@ -9,8 +10,6 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE from .common import ( async_enable_traffic, - async_init_zigpy_device, - async_test_device_join, find_entity_id, make_attribute, make_zcl_header, @@ -22,24 +21,24 @@ ON = 1 OFF = 0 -async def test_switch(hass, config_entry, zha_gateway): +@pytest.fixture +def zigpy_device(zigpy_device_mock): + """Device tracker zigpy device.""" + endpoints = { + 1: { + "in_clusters": [general.Basic.cluster_id, general.OnOff.cluster_id], + "out_clusters": [], + "device_type": 0, + } + } + return zigpy_device_mock(endpoints) + + +async def test_switch(hass, zha_gateway, zha_device_joined_restored, zigpy_device): """Test zha switch platform.""" - # create zigpy device - zigpy_device = await async_init_zigpy_device( - hass, - [general.OnOff.cluster_id, general.Basic.cluster_id], - [], - None, - zha_gateway, - ) - - # load up switch domain - await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN) - await hass.async_block_till_done() - + zha_device = await zha_device_joined_restored(zigpy_device) cluster = zigpy_device.endpoints.get(1).on_off - zha_device = zha_gateway.get_device(zigpy_device.ieee) entity_id = await find_entity_id(DOMAIN, zha_device, hass) assert entity_id is not None @@ -94,4 +93,11 @@ async def test_switch(hass, config_entry, zha_gateway): ) # test joining a new switch to the network and HA - await async_test_device_join(hass, zha_gateway, general.OnOff.cluster_id, entity_id) + cluster.bind.reset_mock() + cluster.configure_reporting.reset_mock() + await zha_gateway.async_device_initialized(zigpy_device) + await hass.async_block_till_done() + assert cluster.bind.call_count == 1 + assert cluster.bind.await_count == 1 + assert cluster.configure_reporting.call_count == 1 + assert cluster.configure_reporting.await_count == 1