ZHA tests refactoring (#31682)

* Fixtures for restoring/joning a device.

* binary_sensor.zha tests.

* cover.zha platform tests.

* device_tracker.zha platform tests.

* fan.zha platform tests.

* switch.zha platform tests.

* Update light.zha platform tests.

* Update sensor.zha platform tests.

* ZHA api tests refactoring.

* Update lock.zha platform tests.

* Update ZHA gateway tests.

* Update zha device action tests.

* Update zha device trigger tests.

* Cleanup.
This commit is contained in:
Alexei Chetroi 2020-02-09 21:45:35 -05:00 committed by GitHub
parent 118ba10442
commit 28eeed1db3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 648 additions and 741 deletions

View File

@ -1,6 +1,6 @@
"""Common test objects."""
import time
from unittest.mock import Mock, patch
from unittest.mock import Mock
from asynctest import CoroutineMock
import zigpy.profiles.zha
@ -18,8 +18,6 @@ from homeassistant.components.zha.core.const import (
)
from homeassistant.util import slugify
from tests.common import mock_coro
class FakeApplication:
"""Fake application for mocking zigpy."""
@ -100,66 +98,6 @@ class FakeDevice:
self.node_desc = zigpy.zdo.types.NodeDescriptor.deserialize(node_desc)[0]
def make_device(endpoints, ieee, manufacturer, model):
"""Make a fake device using the specified cluster classes."""
device = FakeDevice(ieee, manufacturer, model)
for epid, ep in endpoints.items():
endpoint = FakeEndpoint(manufacturer, model, epid)
endpoint.device = device
device.endpoints[epid] = endpoint
endpoint.device_type = ep["device_type"]
profile_id = ep.get("profile_id")
if profile_id:
endpoint.profile_id = profile_id
for cluster_id in ep.get("in_clusters", []):
endpoint.add_input_cluster(cluster_id)
for cluster_id in ep.get("out_clusters", []):
endpoint.add_output_cluster(cluster_id)
return device
async def async_init_zigpy_device(
hass,
in_cluster_ids,
out_cluster_ids,
device_type,
gateway,
ieee="00:0d:6f:00:0a:90:69:e7",
manufacturer="FakeManufacturer",
model="FakeModel",
is_new_join=False,
):
"""Create and initialize a device.
This creates a fake device and adds it to the "network". It can be used to
test existing device functionality and new device pairing functionality.
The is_new_join parameter influences whether or not the device will go
through cluster binding and zigbee cluster configure reporting. That only
happens when the device is paired to the network for the first time.
"""
device = make_device(
{
1: {
"in_clusters": in_cluster_ids,
"out_clusters": out_cluster_ids,
"device_type": device_type,
}
},
ieee,
manufacturer,
model,
)
if is_new_join:
await gateway.async_device_initialized(device)
else:
await gateway.async_device_restored(device)
await hass.async_block_till_done()
return device
def make_attribute(attrid, value, status=0):
"""Make an attribute."""
attr = zcl_f.Attribute()
@ -202,36 +140,6 @@ async def async_enable_traffic(hass, zha_gateway, zha_devices):
await hass.async_block_till_done()
async def async_test_device_join(
hass, zha_gateway, cluster_id, entity_id, device_type=None
):
"""Test a newly joining device.
This creates a new fake device and adds it to the network. It is meant to
simulate pairing a new device to the network so that code pathways that
only trigger during device joins can be tested.
"""
# create zigpy device mocking out the zigbee network operations
with patch(
"zigpy.zcl.Cluster.configure_reporting",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
with patch(
"zigpy.zcl.Cluster.bind",
return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]),
):
await async_init_zigpy_device(
hass,
[cluster_id, zigpy.zcl.clusters.general.Basic.cluster_id],
[],
device_type,
zha_gateway,
ieee="00:0d:6f:00:0a:90:69:f7",
is_new_join=True,
)
assert hass.states.get(entity_id) is not None
def make_zcl_header(command_id: int, global_command: bool = True) -> zcl_f.ZCLHeader:
"""Cluster.handle_message() ZCL Header helper."""
if global_command:

View File

@ -1,4 +1,5 @@
"""Test configuration for the ZHA component."""
import functools
from unittest import mock
from unittest.mock import patch
@ -7,7 +8,6 @@ import pytest
import zigpy
from zigpy.application import ControllerApplication
from homeassistant import config_entries
from homeassistant.components.zha.core.const import COMPONENTS, DATA_ZHA, DOMAIN
from homeassistant.components.zha.core.gateway import ZHAGateway
from homeassistant.components.zha.core.store import async_get_registry
@ -15,27 +15,39 @@ from homeassistant.helpers.device_registry import async_get_registry as get_dev_
from .common import FakeDevice, FakeEndpoint, async_setup_entry
from tests.common import MockConfigEntry
FIXTURE_GRP_ID = 0x1001
FIXTURE_GRP_NAME = "fixture group"
@pytest.fixture(name="config_entry")
def config_entry_fixture(hass):
async def config_entry_fixture(hass):
"""Fixture representing a config entry."""
config_entry = config_entries.ConfigEntry(
1,
DOMAIN,
"Mock Title",
{},
"test",
config_entries.CONN_CLASS_LOCAL_PUSH,
system_options={},
)
config_entry = MockConfigEntry(domain=DOMAIN)
config_entry.add_to_hass(hass)
return config_entry
@pytest.fixture
async def setup_zha(hass, config_entry):
"""Load the ZHA component.
This will init the ZHA component. It loads the component in HA so that
we can test the domains that ZHA supports without actually having a zigbee
network running.
"""
# this prevents needing an actual radio and zigbee network available
with patch("homeassistant.components.zha.async_setup_entry", async_setup_entry):
hass.data[DATA_ZHA] = {}
# init ZHA
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
@pytest.fixture(name="zha_gateway")
async def zha_gateway_fixture(hass, config_entry):
async def zha_gateway_fixture(hass, config_entry, setup_zha):
"""Fixture representing a zha gateway.
Create a ZHAGateway object that can be used to interact with as if we
@ -57,23 +69,6 @@ async def zha_gateway_fixture(hass, config_entry):
return gateway
@pytest.fixture(autouse=True)
async def setup_zha(hass, config_entry):
"""Load the ZHA component.
This will init the ZHA component. It loads the component in HA so that
we can test the domains that ZHA supports without actually having a zigbee
network running.
"""
# this prevents needing an actual radio and zigbee network available
with patch("homeassistant.components.zha.async_setup_entry", async_setup_entry):
hass.data[DATA_ZHA] = {}
# init ZHA
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
@pytest.fixture
def channel():
"""Channel mock factory fixture."""
@ -121,3 +116,43 @@ def zigpy_device_mock():
return device
return _mock_dev
@pytest.fixture
def _zha_device_restored_or_joined(hass, zha_gateway, config_entry):
"""Make a restored or joined ZHA devices."""
async def _zha_device(is_new_join, zigpy_dev):
if is_new_join:
for cmp in COMPONENTS:
await hass.config_entries.async_forward_entry_setup(config_entry, cmp)
await hass.async_block_till_done()
await zha_gateway.async_device_initialized(zigpy_dev)
else:
await zha_gateway.async_device_restored(zigpy_dev)
for cmp in COMPONENTS:
await hass.config_entries.async_forward_entry_setup(config_entry, cmp)
await hass.async_block_till_done()
return zha_gateway.get_device(zigpy_dev.ieee)
return _zha_device
@pytest.fixture
def zha_device_joined(_zha_device_restored_or_joined):
"""Return a newly joined ZHA device."""
return functools.partial(_zha_device_restored_or_joined, True)
@pytest.fixture
def zha_device_restored(_zha_device_restored_or_joined):
"""Return a restored ZHA device."""
return functools.partial(_zha_device_restored_or_joined, False)
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])
def zha_device_joined_restored(request):
"""Join or restore ZHA device."""
return request.getfixturevalue(request.param)

View File

@ -1,11 +1,9 @@
"""Test ZHA API."""
import pytest
import zigpy
import zigpy.profiles.zha
import zigpy.zcl.clusters.general as general
from homeassistant.components.light import DOMAIN as light_domain
from homeassistant.components.switch import DOMAIN
from homeassistant.components.websocket_api import const
from homeassistant.components.zha.api import ID, TYPE, async_load_api
from homeassistant.components.zha.core.const import (
@ -23,50 +21,67 @@ from homeassistant.components.zha.core.const import (
GROUP_NAME,
)
from .common import async_init_zigpy_device
from .conftest import FIXTURE_GRP_ID, FIXTURE_GRP_NAME
IEEE_SWITCH_DEVICE = "01:2d:6f:00:0a:90:69:e7"
IEEE_GROUPABLE_DEVICE = "01:2d:6f:00:0a:90:69:e8"
@pytest.fixture
async def zha_client(hass, config_entry, zha_gateway, hass_ws_client):
async def device_switch(hass, zha_gateway, zigpy_device_mock, zha_device_joined):
"""Test zha switch platform."""
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [general.OnOff.cluster_id, general.Basic.cluster_id],
"out_clusters": [],
"device_type": zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH,
}
},
ieee=IEEE_SWITCH_DEVICE,
)
zha_device = await zha_device_joined(zigpy_device)
zha_device.set_available(True)
return zha_device
@pytest.fixture
async def device_groupable(hass, zha_gateway, zigpy_device_mock, zha_device_joined):
"""Test zha light platform."""
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [
general.OnOff.cluster_id,
general.Basic.cluster_id,
general.Groups.cluster_id,
],
"out_clusters": [],
"device_type": zigpy.profiles.zha.DeviceType.ON_OFF_SWITCH,
}
},
ieee=IEEE_GROUPABLE_DEVICE,
)
zha_device = await zha_device_joined(zigpy_device)
zha_device.set_available(True)
return zha_device
@pytest.fixture
async def zha_client(hass, hass_ws_client, device_switch, device_groupable):
"""Test zha switch platform."""
# load the ZHA API
async_load_api(hass)
# create zigpy device
await async_init_zigpy_device(
hass,
[general.OnOff.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
)
await async_init_zigpy_device(
hass,
[general.OnOff.cluster_id, general.Basic.cluster_id, general.Groups.cluster_id],
[],
zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
zha_gateway,
manufacturer="FakeGroupManufacturer",
model="FakeGroupModel",
ieee="01:2d:6f:00:0a:90:69:e8",
)
# load up switch domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
await hass.config_entries.async_forward_entry_setup(config_entry, light_domain)
await hass.async_block_till_done()
return await hass_ws_client(hass)
async def test_device_clusters(hass, config_entry, zha_gateway, zha_client):
"""Test getting device cluster info."""
await zha_client.send_json(
{ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7"}
{ID: 5, TYPE: "zha/devices/clusters", ATTR_IEEE: IEEE_SWITCH_DEVICE}
)
msg = await zha_client.receive_json()
@ -86,14 +101,14 @@ async def test_device_clusters(hass, config_entry, zha_gateway, zha_client):
assert cluster_info[ATTR_NAME] == "OnOff"
async def test_device_cluster_attributes(hass, config_entry, zha_gateway, zha_client):
async def test_device_cluster_attributes(zha_client):
"""Test getting device cluster attributes."""
await zha_client.send_json(
{
ID: 5,
TYPE: "zha/devices/clusters/attributes",
ATTR_ENDPOINT_ID: 1,
ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7",
ATTR_IEEE: IEEE_SWITCH_DEVICE,
ATTR_CLUSTER_ID: 6,
ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN,
}
@ -109,14 +124,14 @@ async def test_device_cluster_attributes(hass, config_entry, zha_gateway, zha_cl
assert attribute[ATTR_NAME] is not None
async def test_device_cluster_commands(hass, config_entry, zha_gateway, zha_client):
async def test_device_cluster_commands(zha_client):
"""Test getting device cluster commands."""
await zha_client.send_json(
{
ID: 5,
TYPE: "zha/devices/clusters/commands",
ATTR_ENDPOINT_ID: 1,
ATTR_IEEE: "00:0d:6f:00:0a:90:69:e7",
ATTR_IEEE: IEEE_SWITCH_DEVICE,
ATTR_CLUSTER_ID: 6,
ATTR_CLUSTER_TYPE: CLUSTER_TYPE_IN,
}
@ -133,7 +148,7 @@ async def test_device_cluster_commands(hass, config_entry, zha_gateway, zha_clie
assert command[TYPE] is not None
async def test_list_devices(hass, config_entry, zha_gateway, zha_client):
async def test_list_devices(zha_client):
"""Test getting zha devices."""
await zha_client.send_json({ID: 5, TYPE: "zha/devices"})
@ -164,7 +179,7 @@ async def test_list_devices(hass, config_entry, zha_gateway, zha_client):
assert device == device2
async def test_device_not_found(hass, config_entry, zha_gateway, zha_client):
async def test_device_not_found(zha_client):
"""Test not found response from get device API."""
await zha_client.send_json(
{ID: 6, TYPE: "zha/device", ATTR_IEEE: "28:6d:97:00:01:04:11:8c"}
@ -176,7 +191,7 @@ async def test_device_not_found(hass, config_entry, zha_gateway, zha_client):
assert msg["error"]["code"] == const.ERR_NOT_FOUND
async def test_list_groups(hass, config_entry, zha_gateway, zha_client):
async def test_list_groups(zha_client):
"""Test getting zha zigbee groups."""
await zha_client.send_json({ID: 7, TYPE: "zha/groups"})
@ -193,7 +208,7 @@ async def test_list_groups(hass, config_entry, zha_gateway, zha_client):
assert group["members"] == []
async def test_get_group(hass, config_entry, zha_gateway, zha_client):
async def test_get_group(zha_client):
"""Test getting a specific zha zigbee group."""
await zha_client.send_json({ID: 8, TYPE: "zha/group", GROUP_ID: FIXTURE_GRP_ID})
@ -208,7 +223,7 @@ async def test_get_group(hass, config_entry, zha_gateway, zha_client):
assert group["members"] == []
async def test_get_group_not_found(hass, config_entry, zha_gateway, zha_client):
async def test_get_group_not_found(zha_client):
"""Test not found response from get group API."""
await zha_client.send_json({ID: 9, TYPE: "zha/group", GROUP_ID: 1234567})
@ -220,14 +235,9 @@ async def test_get_group_not_found(hass, config_entry, zha_gateway, zha_client):
assert msg["error"]["code"] == const.ERR_NOT_FOUND
async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_client):
async def test_list_groupable_devices(zha_client, device_groupable):
"""Test getting zha devices that have a group cluster."""
# Make device available
zha_gateway.devices[
zigpy.types.EUI64.convert("01:2d:6f:00:0a:90:69:e8")
].set_available(True)
await zha_client.send_json({ID: 10, TYPE: "zha/devices/groupable"})
msg = await zha_client.receive_json()
@ -251,9 +261,7 @@ async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_clien
# Make sure there are no groupable devices when the device is unavailable
# Make device unavailable
zha_gateway.devices[
zigpy.types.EUI64.convert("01:2d:6f:00:0a:90:69:e8")
].set_available(False)
device_groupable.set_available(False)
await zha_client.send_json({ID: 11, TYPE: "zha/devices/groupable"})
@ -265,7 +273,7 @@ async def test_list_groupable_devices(hass, config_entry, zha_gateway, zha_clien
assert len(devices) == 0
async def test_add_group(hass, config_entry, zha_gateway, zha_client):
async def test_add_group(zha_client):
"""Test adding and getting a new zha zigbee group."""
await zha_client.send_json({ID: 12, TYPE: "zha/group/add", GROUP_NAME: "new_group"})
@ -291,7 +299,7 @@ async def test_add_group(hass, config_entry, zha_gateway, zha_client):
assert group["name"] == FIXTURE_GRP_NAME or group["name"] == "new_group"
async def test_remove_group(hass, config_entry, zha_gateway, zha_client):
async def test_remove_group(zha_client):
"""Test removing a new zha zigbee group."""
await zha_client.send_json({ID: 14, TYPE: "zha/groups"})

View File

@ -1,5 +1,5 @@
"""Test zha binary sensor."""
import zigpy.zcl.clusters.general as general
import pytest
import zigpy.zcl.clusters.measurement as measurement
import zigpy.zcl.clusters.security as security
import zigpy.zcl.foundation as zcl_f
@ -9,76 +9,27 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
)
DEVICE_IAS = {
1: {
"device_type": 1026,
"in_clusters": [security.IasZone.cluster_id],
"out_clusters": [],
}
}
async def test_binary_sensor(hass, config_entry, zha_gateway):
"""Test zha binary_sensor platform."""
# create zigpy devices
zigpy_device_zone = await async_init_zigpy_device(
hass,
[security.IasZone.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
ieee="00:0d:6f:11:9a:90:69:e6",
)
zigpy_device_occupancy = await async_init_zigpy_device(
hass,
[measurement.OccupancySensing.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
ieee="00:0d:6f:11:9a:90:69:e7",
manufacturer="FakeOccupancy",
model="FakeOccupancyModel",
)
# load up binary_sensor domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
# on off binary_sensor
zone_cluster = zigpy_device_zone.endpoints.get(1).ias_zone
zone_zha_device = zha_gateway.get_device(zigpy_device_zone.ieee)
zone_entity_id = await find_entity_id(DOMAIN, zone_zha_device, hass)
assert zone_entity_id is not None
# occupancy binary_sensor
occupancy_cluster = zigpy_device_occupancy.endpoints.get(1).occupancy
occupancy_zha_device = zha_gateway.get_device(zigpy_device_occupancy.ieee)
occupancy_entity_id = await find_entity_id(DOMAIN, occupancy_zha_device, hass)
assert occupancy_entity_id is not None
# test that the sensors exist and are in the unavailable state
assert hass.states.get(zone_entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(occupancy_entity_id).state == STATE_UNAVAILABLE
await async_enable_traffic(
hass, zha_gateway, [zone_zha_device, occupancy_zha_device]
)
# test that the sensors exist and are in the off state
assert hass.states.get(zone_entity_id).state == STATE_OFF
assert hass.states.get(occupancy_entity_id).state == STATE_OFF
# test getting messages that trigger and reset the sensors
await async_test_binary_sensor_on_off(hass, occupancy_cluster, occupancy_entity_id)
# test IASZone binary sensors
await async_test_iaszone_on_off(hass, zone_cluster, zone_entity_id)
# test new sensor join
await async_test_device_join(
hass, zha_gateway, measurement.OccupancySensing.cluster_id, occupancy_entity_id
)
DEVICE_OCCUPANCY = {
1: {
"device_type": 263,
"in_clusters": [measurement.OccupancySensing.cluster_id],
"out_clusters": [],
}
}
async def async_test_binary_sensor_on_off(hass, cluster, entity_id):
@ -109,3 +60,52 @@ async def async_test_iaszone_on_off(hass, cluster, entity_id):
cluster.listener_event("cluster_command", 1, 0, [0])
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
@pytest.mark.parametrize(
"device, on_off_test, cluster_name, reporting",
[
(DEVICE_IAS, async_test_iaszone_on_off, "ias_zone", False),
(DEVICE_OCCUPANCY, async_test_binary_sensor_on_off, "occupancy", True),
],
)
async def test_binary_sensor(
hass,
zha_gateway,
zigpy_device_mock,
zha_device_joined_restored,
device,
on_off_test,
cluster_name,
reporting,
):
"""Test ZHA binary_sensor platform."""
zigpy_device = zigpy_device_mock(device)
zha_device = await zha_device_joined_restored(zigpy_device)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
# test that the sensors exist and are in the unavailable state
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
await async_enable_traffic(hass, zha_gateway, [zha_device])
# test that the sensors exist and are in the off state
assert hass.states.get(entity_id).state == STATE_OFF
# test getting messages that trigger and reset the sensors
cluster = getattr(zigpy_device.endpoints[1], cluster_name)
await on_off_test(hass, cluster, entity_id)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
if reporting:
assert cluster.configure_reporting.call_count > 0
assert cluster.configure_reporting.await_count > 0

View File

@ -1,9 +1,10 @@
"""Test zha cover."""
from unittest.mock import MagicMock, call, patch
import asynctest
import pytest
import zigpy.types
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.cover import DOMAIN
@ -11,8 +12,6 @@ from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
@ -21,17 +20,27 @@ from .common import (
from tests.common import mock_coro
async def test_cover(hass, config_entry, zha_gateway):
"""Test zha cover platform."""
@pytest.fixture
def zigpy_cover_device(zigpy_device_mock):
"""Zigpy cover device."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[closures.WindowCovering.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
)
endpoints = {
1: {
"device_type": 1026,
"in_clusters": [closures.WindowCovering.cluster_id],
"out_clusters": [],
}
}
return zigpy_device_mock(endpoints)
@asynctest.patch(
"homeassistant.components.zha.core.channels.closures.WindowCovering.async_initialize"
)
async def test_cover(
m1, hass, zha_gateway, zha_device_joined_restored, zigpy_cover_device
):
"""Test zha cover platform."""
async def get_chan_attr(*args, **kwargs):
return 100
@ -41,13 +50,11 @@ async def test_cover(hass, config_entry, zha_gateway):
new=MagicMock(side_effect=get_chan_attr),
) as get_attr_mock:
# load up cover domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert get_attr_mock.call_count == 2
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"
cluster = zigpy_device.endpoints.get(1).window_covering
zha_device = zha_gateway.get_device(zigpy_device.ieee)
cluster = zigpy_cover_device.endpoints.get(1).window_covering
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
@ -124,6 +131,13 @@ async def test_cover(hass, config_entry, zha_gateway):
False, 0x2, (), expect_reply=True, manufacturer=None
)
await async_test_device_join(
hass, zha_gateway, closures.WindowCovering.cluster_id, entity_id
)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_cover_device)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OPEN
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1

View File

@ -11,12 +11,10 @@ from homeassistant.components.device_automation import (
_async_get_device_automations as async_get_device_automations,
)
from homeassistant.components.zha import DOMAIN
from homeassistant.components.zha.core.const import CHANNEL_ON_OFF
from homeassistant.components.zha.core.const import CHANNEL_EVENT_RELAY
from homeassistant.helpers.device_registry import async_get_registry
from homeassistant.setup import async_setup_component
from .common import async_enable_traffic, async_init_zigpy_device
from tests.common import async_mock_service, mock_coro
SHORT_PRESS = "remote_button_short_press"
@ -30,28 +28,31 @@ def calls(hass):
return async_mock_service(hass, "zha", "warning_device_warn")
async def test_get_actions(hass, config_entry, zha_gateway):
"""Test we get the expected actions from a zha device."""
@pytest.fixture
async def device_ias(hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored):
"""IAS device fixture."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[
general.Basic.cluster_id,
security.IasZone.cluster_id,
security.IasWd.cluster_id,
],
[],
None,
zha_gateway,
clusters = [general.Basic, security.IasZone, security.IasWd]
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [c.cluster_id for c in clusters],
"out_clusters": [general.OnOff.cluster_id],
"device_type": 0,
}
},
)
await hass.config_entries.async_forward_entry_setup(config_entry, "binary_sensor")
zha_device = await zha_device_joined_restored(zigpy_device)
zha_device.update_available(True)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
return zigpy_device, zha_device
zha_device = zha_gateway.get_device(zigpy_device.ieee)
ieee_address = str(zha_device.ieee)
async def test_get_actions(hass, device_ias):
"""Test we get the expected actions from a zha device."""
ieee_address = str(device_ias[0].ieee)
ha_device_registry = await async_get_registry(hass)
reg_device = ha_device_registry.async_get_device({(DOMAIN, ieee_address)}, set())
@ -66,40 +67,19 @@ async def test_get_actions(hass, config_entry, zha_gateway):
assert actions == expected_actions
async def test_action(hass, config_entry, zha_gateway, calls):
async def test_action(hass, calls, device_ias):
"""Test for executing a zha device action."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[
general.Basic.cluster_id,
security.IasZone.cluster_id,
security.IasWd.cluster_id,
],
[general.OnOff.cluster_id],
None,
zha_gateway,
)
zigpy_device, zha_device = device_ias
zigpy_device.device_automation_triggers = {
(SHORT_PRESS, SHORT_PRESS): {COMMAND: COMMAND_SINGLE}
}
await hass.config_entries.async_forward_entry_setup(config_entry, "switch")
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
reg_device = ha_device_registry.async_get_device({(DOMAIN, ieee_address)}, set())
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x00, zcl_f.Status.SUCCESS]),
@ -129,8 +109,8 @@ async def test_action(hass, config_entry, zha_gateway, calls):
await hass.async_block_till_done()
on_off_channel = zha_device.cluster_channels[CHANNEL_ON_OFF]
on_off_channel.zha_send_event(on_off_channel.cluster, COMMAND_SINGLE, [])
channel = {ch.name: ch for ch in zha_device.all_channels}[CHANNEL_EVENT_RELAY]
channel.zha_send_event(channel.cluster, COMMAND_SINGLE, [])
await hass.async_block_till_done()
assert len(calls) == 1

View File

@ -2,6 +2,7 @@
from datetime import timedelta
import time
import pytest
import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f
@ -14,8 +15,6 @@ import homeassistant.util.dt as dt_util
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
@ -24,37 +23,39 @@ from .common import (
from tests.common import async_fire_time_changed
async def test_device_tracker(hass, config_entry, zha_gateway):
@pytest.fixture
def zigpy_device_dt(zigpy_device_mock):
"""Device tracker zigpy device."""
endpoints = {
1: {
"in_clusters": [
general.Basic.cluster_id,
general.PowerConfiguration.cluster_id,
general.Identify.cluster_id,
general.PollControl.cluster_id,
general.BinaryInput.cluster_id,
],
"out_clusters": [general.Identify.cluster_id, general.Ota.cluster_id],
"device_type": SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE,
}
}
return zigpy_device_mock(endpoints)
async def test_device_tracker(
hass, zha_gateway, zha_device_joined_restored, zigpy_device_dt
):
"""Test zha device tracker platform."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[
general.Basic.cluster_id,
general.PowerConfiguration.cluster_id,
general.Identify.cluster_id,
general.PollControl.cluster_id,
general.BinaryInput.cluster_id,
],
[general.Identify.cluster_id, general.Ota.cluster_id],
SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE,
zha_gateway,
)
# load up device tracker domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
cluster = zigpy_device.endpoints.get(1).power
zha_device = zha_gateway.get_device(zigpy_device.ieee)
zha_device = await zha_device_joined_restored(zigpy_device_dt)
cluster = zigpy_device_dt.endpoints.get(1).power
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
# test that the device tracker was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
zigpy_device.last_seen = time.time() - 120
zigpy_device_dt.last_seen = time.time() - 120
next_update = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()
@ -73,7 +74,7 @@ async def test_device_tracker(hass, config_entry, zha_gateway):
attr = make_attribute(0x0021, 200)
cluster.handle_message(hdr, [[attr]])
zigpy_device.last_seen = time.time() + 10
zigpy_device_dt.last_seen = time.time() + 10
next_update = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()
@ -87,10 +88,12 @@ async def test_device_tracker(hass, config_entry, zha_gateway):
assert entity.battery_level == 100
# test adding device tracker to the network and HA
await async_test_device_join(
hass,
zha_gateway,
general.PowerConfiguration.cluster_id,
entity_id,
SMARTTHINGS_ARRIVAL_SENSOR_DEVICE_TYPE,
)
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device_dt)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_HOME
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 2
assert cluster.configure_reporting.await_count == 2

View File

@ -3,13 +3,10 @@ import pytest
import zigpy.zcl.clusters.general as general
import homeassistant.components.automation as automation
from homeassistant.components.switch import DOMAIN
from homeassistant.components.zha.core.const import CHANNEL_ON_OFF
from homeassistant.components.zha.core.const import CHANNEL_EVENT_RELAY
from homeassistant.helpers.device_registry import async_get_registry
from homeassistant.setup import async_setup_component
from .common import async_enable_traffic, async_init_zigpy_device
from tests.common import async_get_device_automations, async_mock_service
ON = 1
@ -42,13 +39,31 @@ def calls(hass):
return async_mock_service(hass, "test", "automation")
async def test_triggers(hass, config_entry, zha_gateway):
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])
async def mock_devices(hass, zha_gateway, zigpy_device_mock, request):
"""IAS device fixture."""
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [general.Basic.cluster_id],
"out_clusters": [general.OnOff.cluster_id],
"device_type": 0,
}
},
)
join_or_restore = request.getfixturevalue(request.param)
zha_device = await join_or_restore(zigpy_device)
zha_device.update_available(True)
await hass.async_block_till_done()
return zigpy_device, zha_device
async def test_triggers(hass, mock_devices):
"""Test zha device triggers."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway
)
zigpy_device, zha_device = mock_devices
zigpy_device.device_automation_triggers = {
(SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE},
@ -58,11 +73,6 @@ async def test_triggers(hass, config_entry, zha_gateway):
(LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD},
}
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
@ -110,19 +120,10 @@ async def test_triggers(hass, config_entry, zha_gateway):
assert _same_lists(triggers, expected_triggers)
async def test_no_triggers(hass, config_entry, zha_gateway):
async def test_no_triggers(hass, mock_devices):
"""Test zha device with no triggers."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway
)
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
_, zha_device = mock_devices
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
@ -132,13 +133,10 @@ async def test_no_triggers(hass, config_entry, zha_gateway):
assert triggers == []
async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls):
async def test_if_fires_on_event(hass, mock_devices, calls):
"""Test for remote triggers firing."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway
)
zigpy_device, zha_device = mock_devices
zigpy_device.device_automation_triggers = {
(SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE},
@ -148,15 +146,6 @@ async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls):
(LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD},
}
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
reg_device = ha_device_registry.async_get_device({("zha", ieee_address)}, set())
@ -185,30 +174,18 @@ async def test_if_fires_on_event(hass, config_entry, zha_gateway, calls):
await hass.async_block_till_done()
on_off_channel = zha_device.cluster_channels[CHANNEL_ON_OFF]
on_off_channel.zha_send_event(on_off_channel.cluster, COMMAND_SINGLE, [])
channel = {ch.name: ch for ch in zha_device.all_channels}[CHANNEL_EVENT_RELAY]
channel.zha_send_event(channel.cluster, COMMAND_SINGLE, [])
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["message"] == "service called"
async def test_exception_no_triggers(hass, config_entry, zha_gateway, calls, caplog):
async def test_exception_no_triggers(hass, mock_devices, calls, caplog):
"""Test for exception on event triggers firing."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway
)
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
_, zha_device = mock_devices
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
@ -239,13 +216,10 @@ async def test_exception_no_triggers(hass, config_entry, zha_gateway, calls, cap
assert "Invalid config for [automation]" in caplog.text
async def test_exception_bad_trigger(hass, config_entry, zha_gateway, calls, caplog):
async def test_exception_bad_trigger(hass, mock_devices, calls, caplog):
"""Test for exception on event triggers firing."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [general.OnOff.cluster_id], None, zha_gateway
)
zigpy_device, zha_device = mock_devices
zigpy_device.device_automation_triggers = {
(SHAKEN, SHAKEN): {COMMAND: COMMAND_SHAKE},
@ -255,15 +229,6 @@ async def test_exception_bad_trigger(hass, config_entry, zha_gateway, calls, cap
(LONG_RELEASE, LONG_RELEASE): {COMMAND: COMMAND_HOLD},
}
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
hass.config_entries._entries.append(config_entry)
zha_device = zha_gateway.get_device(zigpy_device.ieee)
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
ieee_address = str(zha_device.ieee)
ha_device_registry = await async_get_registry(hass)
reg_device = ha_device_registry.async_get_device({("zha", ieee_address)}, set())

View File

@ -1,7 +1,7 @@
"""Test zha fan."""
from unittest.mock import call, patch
import zigpy.zcl.clusters.general as general
import pytest
import zigpy.zcl.clusters.hvac as hvac
import zigpy.zcl.foundation as zcl_f
@ -18,8 +18,6 @@ from homeassistant.const import (
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
@ -28,20 +26,20 @@ from .common import (
from tests.common import mock_coro
async def test_fan(hass, config_entry, zha_gateway):
@pytest.fixture
def zigpy_device(zigpy_device_mock):
"""Device tracker zigpy device."""
endpoints = {
1: {"in_clusters": [hvac.Fan.cluster_id], "out_clusters": [], "device_type": 0}
}
return zigpy_device_mock(endpoints)
async def test_fan(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
"""Test zha fan platform."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [hvac.Fan.cluster_id, general.Basic.cluster_id], [], None, zha_gateway
)
# load up fan domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
zha_device = await zha_device_joined_restored(zigpy_device)
cluster = zigpy_device.endpoints.get(1).fan
zha_device = zha_gateway.get_device(zigpy_device.ieee)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
@ -98,7 +96,14 @@ async def test_fan(hass, config_entry, zha_gateway):
assert cluster.write_attributes.call_args == call({"fan_mode": 3})
# test adding new fan to the network and HA
await async_test_device_join(hass, zha_gateway, hvac.Fan.cluster_id, entity_id)
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1
async def async_turn_on(hass, entity_id, speed=None):

View File

@ -1,29 +1,39 @@
"""Test ZHA Gateway."""
import pytest
import zigpy.zcl.clusters.general as general
import homeassistant.components.zha.core.const as zha_const
from .common import async_enable_traffic, async_init_zigpy_device
from .common import async_enable_traffic
async def test_device_left(hass, config_entry, zha_gateway):
"""Test zha fan platform."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass, [general.Basic.cluster_id], [], None, zha_gateway
@pytest.fixture
def zigpy_dev_basic(zigpy_device_mock):
"""Zigpy device with just a basic cluster."""
return zigpy_device_mock(
{
1: {
"in_clusters": [general.Basic.cluster_id],
"out_clusters": [],
"device_type": 0,
}
},
)
# load up fan domain
await hass.config_entries.async_forward_entry_setup(config_entry, zha_const.SENSOR)
await hass.async_block_till_done()
zha_device = zha_gateway.get_device(zigpy_device.ieee)
@pytest.fixture
async def zha_dev_basic(hass, zha_device_restored, zigpy_dev_basic):
"""ZHA device with just a basic cluster."""
assert zha_device.available is False
zha_device = await zha_device_restored(zigpy_dev_basic)
return zha_device
await async_enable_traffic(hass, zha_gateway, [zha_device])
assert zha_device.available is True
zha_gateway.device_left(zigpy_device)
assert zha_device.available is False
async def test_device_left(hass, zha_gateway, zigpy_dev_basic, zha_dev_basic):
"""Device leaving the network should become unavailable."""
assert zha_dev_basic.available is False
await async_enable_traffic(hass, zha_gateway, [zha_dev_basic])
assert zha_dev_basic.available is True
zha_gateway.device_left(zigpy_dev_basic)
assert zha_dev_basic.available is False

View File

@ -1,10 +1,12 @@
"""Test zha light."""
import asyncio
from unittest.mock import MagicMock, call, patch, sentinel
from unittest.mock import call, sentinel
import asynctest
import pytest
import zigpy.profiles.zha
import zigpy.types
import zigpy.zcl.clusters.general as general
import zigpy.zcl.clusters.lighting as lighting
import zigpy.zcl.foundation as zcl_f
from homeassistant.components.light import DOMAIN
@ -12,121 +14,124 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
)
from tests.common import mock_coro
ON = 1
OFF = 0
LIGHT_ON_OFF = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
"in_clusters": [general.Basic.cluster_id, general.OnOff.cluster_id],
"out_clusters": [general.Ota.cluster_id],
}
}
async def test_light(hass, config_entry, zha_gateway, monkeypatch):
LIGHT_LEVEL = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.DIMMABLE_LIGHT,
"in_clusters": [
general.Basic.cluster_id,
general.LevelControl.cluster_id,
general.OnOff.cluster_id,
],
"out_clusters": [general.Ota.cluster_id],
}
}
LIGHT_COLOR = {
1: {
"device_type": zigpy.profiles.zha.DeviceType.COLOR_DIMMABLE_LIGHT,
"in_clusters": [
general.Basic.cluster_id,
general.LevelControl.cluster_id,
general.OnOff.cluster_id,
lighting.Color.cluster_id,
],
"out_clusters": [general.Ota.cluster_id],
}
}
@asynctest.patch(
"zigpy.zcl.clusters.lighting.Color.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
)
@asynctest.patch(
"zigpy.zcl.clusters.general.LevelControl.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
)
@asynctest.patch(
"zigpy.zcl.clusters.general.OnOff.request",
new=asynctest.CoroutineMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS]),
)
@pytest.mark.parametrize(
"device, reporting",
[(LIGHT_ON_OFF, (1, 0, 0)), (LIGHT_LEVEL, (1, 1, 0)), (LIGHT_COLOR, (1, 1, 3))],
)
async def test_light(
hass, zha_gateway, zigpy_device_mock, zha_device_joined_restored, device, reporting,
):
"""Test zha light platform."""
# create zigpy devices
zigpy_device_on_off = await async_init_zigpy_device(
hass,
[general.OnOff.cluster_id, general.Basic.cluster_id],
[],
zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
zha_gateway,
ieee="00:0d:6f:11:0a:90:69:e6",
)
zigpy_device = zigpy_device_mock(device)
zha_device = await zha_device_joined_restored(zigpy_device)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
zigpy_device_level = await async_init_zigpy_device(
hass,
[
general.OnOff.cluster_id,
general.LevelControl.cluster_id,
general.Basic.cluster_id,
],
[],
zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
zha_gateway,
ieee="00:0d:6f:11:0a:90:69:e7",
manufacturer="FakeLevelManufacturer",
model="FakeLevelModel",
)
assert entity_id is not None
# load up light domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
# on off light
on_off_device_on_off_cluster = zigpy_device_on_off.endpoints.get(1).on_off
on_off_zha_device = zha_gateway.get_device(zigpy_device_on_off.ieee)
on_off_entity_id = await find_entity_id(DOMAIN, on_off_zha_device, hass)
assert on_off_entity_id is not None
# dimmable light
level_device_on_off_cluster = zigpy_device_level.endpoints.get(1).on_off
level_device_level_cluster = zigpy_device_level.endpoints.get(1).level
on_off_mock = MagicMock(
side_effect=asyncio.coroutine(
MagicMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS])
)
)
level_mock = MagicMock(
side_effect=asyncio.coroutine(
MagicMock(return_value=[sentinel.data, zcl_f.Status.SUCCESS])
)
)
monkeypatch.setattr(level_device_on_off_cluster, "request", on_off_mock)
monkeypatch.setattr(level_device_level_cluster, "request", level_mock)
level_zha_device = zha_gateway.get_device(zigpy_device_level.ieee)
level_entity_id = await find_entity_id(DOMAIN, level_zha_device, hass)
assert level_entity_id is not None
cluster_on_off = zigpy_device.endpoints[1].on_off
cluster_level = getattr(zigpy_device.endpoints[1], "level", None)
cluster_color = getattr(zigpy_device.endpoints[1], "light_color", None)
# test that the lights were created and that they are unavailable
assert hass.states.get(on_off_entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(level_entity_id).state == STATE_UNAVAILABLE
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [on_off_zha_device, level_zha_device])
await async_enable_traffic(hass, zha_gateway, [zha_device])
# test that the lights were created and are off
assert hass.states.get(on_off_entity_id).state == STATE_OFF
assert hass.states.get(level_entity_id).state == STATE_OFF
assert hass.states.get(entity_id).state == STATE_OFF
# test turning the lights on and off from the light
await async_test_on_off_from_light(
hass, on_off_device_on_off_cluster, on_off_entity_id
)
await async_test_on_off_from_light(
hass, level_device_on_off_cluster, level_entity_id
)
await async_test_on_off_from_light(hass, cluster_on_off, entity_id)
# test turning the lights on and off from the HA
await async_test_on_off_from_hass(
hass, on_off_device_on_off_cluster, on_off_entity_id
)
await async_test_on_off_from_hass(hass, cluster_on_off, entity_id)
await async_test_level_on_off_from_hass(
hass, level_device_on_off_cluster, level_device_level_cluster, level_entity_id
)
if cluster_level:
await async_test_level_on_off_from_hass(
hass, cluster_on_off, cluster_level, entity_id
)
# test turning the lights on and off from the light
await async_test_on_from_light(hass, level_device_on_off_cluster, level_entity_id)
# test getting a brightness change from the network
await async_test_on_from_light(hass, cluster_on_off, entity_id)
await async_test_dimmer_from_light(
hass, cluster_level, entity_id, 150, STATE_ON
)
# test getting a brightness change from the network
await async_test_dimmer_from_light(
hass, level_device_level_cluster, level_entity_id, 150, STATE_ON
)
# test adding a new light to the network and HA
await async_test_device_join(
hass,
zha_gateway,
general.OnOff.cluster_id,
on_off_entity_id,
device_type=zigpy.profiles.zha.DeviceType.ON_OFF_LIGHT,
)
# test rejoin
await async_test_off_from_hass(hass, cluster_on_off, entity_id)
clusters = [cluster_on_off]
if cluster_level:
clusters.append(cluster_level)
if cluster_color:
clusters.append(cluster_color)
for cluster in clusters:
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_OFF
for cluster, reporting_count in zip(clusters, reporting):
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == reporting_count
assert cluster.configure_reporting.await_count == reporting_count
async def async_test_on_off_from_light(hass, cluster, entity_id):
@ -157,36 +162,33 @@ async def async_test_on_from_light(hass, cluster, entity_id):
async def async_test_on_off_from_hass(hass, cluster, entity_id):
"""Test on off functionality from hass."""
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x00, zcl_f.Status.SUCCESS]),
):
# turn on via UI
await hass.services.async_call(
DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, ON, (), expect_reply=True, manufacturer=None
)
# turn on via UI
cluster.request.reset_mock()
await hass.services.async_call(
DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.await_count == 1
assert cluster.request.call_args == call(
False, ON, (), expect_reply=True, manufacturer=None
)
await async_test_off_from_hass(hass, cluster, entity_id)
async def async_test_off_from_hass(hass, cluster, entity_id):
"""Test turning off the light from Home Assistant."""
with patch(
"zigpy.zcl.Cluster.request",
return_value=mock_coro([0x01, zcl_f.Status.SUCCESS]),
):
# turn off via UI
await hass.services.async_call(
DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.call_args == call(
False, OFF, (), expect_reply=True, manufacturer=None
)
# turn off via UI
cluster.request.reset_mock()
await hass.services.async_call(
DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert cluster.request.call_count == 1
assert cluster.request.await_count == 1
assert cluster.request.call_args == call(
False, OFF, (), expect_reply=True, manufacturer=None
)
async def async_test_level_on_off_from_hass(
@ -194,12 +196,15 @@ async def async_test_level_on_off_from_hass(
):
"""Test on off functionality from hass."""
on_off_cluster.request.reset_mock()
# turn on via UI
await hass.services.async_call(
DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert on_off_cluster.request.call_count == 1
assert on_off_cluster.request.await_count == 1
assert level_cluster.request.call_count == 0
assert level_cluster.request.await_count == 0
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
)
@ -210,7 +215,9 @@ async def async_test_level_on_off_from_hass(
DOMAIN, "turn_on", {"entity_id": entity_id, "transition": 10}, blocking=True
)
assert on_off_cluster.request.call_count == 1
assert on_off_cluster.request.await_count == 1
assert level_cluster.request.call_count == 1
assert level_cluster.request.await_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
)
@ -230,7 +237,9 @@ async def async_test_level_on_off_from_hass(
DOMAIN, "turn_on", {"entity_id": entity_id, "brightness": 10}, blocking=True
)
assert on_off_cluster.request.call_count == 1
assert on_off_cluster.request.await_count == 1
assert level_cluster.request.call_count == 1
assert level_cluster.request.await_count == 1
assert on_off_cluster.request.call_args == call(
False, 1, (), expect_reply=True, manufacturer=None
)

View File

@ -1,6 +1,8 @@
"""Test zha lock."""
from unittest.mock import patch
import pytest
import zigpy.profiles.zha
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f
@ -10,7 +12,6 @@ from homeassistant.const import STATE_LOCKED, STATE_UNAVAILABLE, STATE_UNLOCKED
from .common import (
async_enable_traffic,
async_init_zigpy_device,
find_entity_id,
make_attribute,
make_zcl_header,
@ -22,24 +23,29 @@ LOCK_DOOR = 0
UNLOCK_DOOR = 1
async def test_lock(hass, config_entry, zha_gateway):
"""Test zha lock platform."""
@pytest.fixture(params=["zha_device_joined", "zha_device_restored"])
async def lock(hass, zha_gateway, zigpy_device_mock, request):
"""Lock cluster fixture."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[closures.DoorLock.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [closures.DoorLock.cluster_id, general.Basic.cluster_id],
"out_clusters": [],
"device_type": zigpy.profiles.zha.DeviceType.DOOR_LOCK,
}
},
)
# load up lock domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
join_or_restore = request.getfixturevalue(request.param)
zha_device = await join_or_restore(zigpy_device)
return zha_device, zigpy_device.endpoints[1].door_lock
cluster = zigpy_device.endpoints.get(1).door_lock
zha_device = zha_gateway.get_device(zigpy_device.ieee)
async def test_lock(hass, zha_gateway, lock):
"""Test zha lock platform."""
zha_device, cluster = lock
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None

View File

@ -11,7 +11,6 @@ import zigpy.zcl.foundation as zcl_f
from homeassistant.components.sensor import DOMAIN
import homeassistant.config as config_util
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_UNIT_OF_MEASUREMENT,
CONF_UNIT_SYSTEM,
CONF_UNIT_SYSTEM_IMPERIAL,
@ -26,160 +25,43 @@ from homeassistant.util import dt as dt_util
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
)
async def test_sensor(hass, config_entry, zha_gateway):
"""Test zha sensor platform."""
# list of cluster ids to create devices and sensor entities for
cluster_ids = [
measurement.RelativeHumidity.cluster_id,
measurement.TemperatureMeasurement.cluster_id,
measurement.PressureMeasurement.cluster_id,
measurement.IlluminanceMeasurement.cluster_id,
smartenergy.Metering.cluster_id,
homeautomation.ElectricalMeasurement.cluster_id,
]
# devices that were created from cluster_ids list above
zigpy_device_infos = await async_build_devices(
hass, zha_gateway, config_entry, cluster_ids
)
# ensure the sensor entity was created for each id in cluster_ids
for cluster_id in cluster_ids:
zigpy_device_info = zigpy_device_infos[cluster_id]
entity_id = zigpy_device_info[ATTR_ENTITY_ID]
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and devices
await async_enable_traffic(
hass,
zha_gateway,
[
zigpy_device_info["zha_device"]
for zigpy_device_info in zigpy_device_infos.values()
],
)
# test that the sensors now have a state of unknown
for cluster_id in cluster_ids:
zigpy_device_info = zigpy_device_infos[cluster_id]
entity_id = zigpy_device_info[ATTR_ENTITY_ID]
assert hass.states.get(entity_id).state == STATE_UNKNOWN
# get the humidity device info and test the associated sensor logic
device_info = zigpy_device_infos[measurement.RelativeHumidity.cluster_id]
await async_test_humidity(hass, device_info)
# get the temperature device info and test the associated sensor logic
device_info = zigpy_device_infos[measurement.TemperatureMeasurement.cluster_id]
await async_test_temperature(hass, device_info)
# get the pressure device info and test the associated sensor logic
device_info = zigpy_device_infos[measurement.PressureMeasurement.cluster_id]
await async_test_pressure(hass, device_info)
# get the illuminance device info and test the associated sensor logic
device_info = zigpy_device_infos[measurement.IlluminanceMeasurement.cluster_id]
await async_test_illuminance(hass, device_info)
# get the metering device info and test the associated sensor logic
device_info = zigpy_device_infos[smartenergy.Metering.cluster_id]
await async_test_metering(hass, device_info)
# get the electrical_measurement device info and test the associated
# sensor logic
device_info = zigpy_device_infos[homeautomation.ElectricalMeasurement.cluster_id]
await async_test_electrical_measurement(hass, device_info)
# test joining a new temperature sensor to the network
await async_test_device_join(
hass, zha_gateway, measurement.TemperatureMeasurement.cluster_id, entity_id
)
async def async_build_devices(hass, zha_gateway, config_entry, cluster_ids):
"""Build a zigpy device for each cluster id.
This will build devices for all cluster ids that exist in cluster_ids.
They get added to the network and then the sensor component is loaded
which will cause sensor entities to get created for each device.
A dict containing relevant device info for testing is returned. It contains
the entity id, zigpy device, and the zigbee cluster for the sensor.
"""
device_infos = {}
counter = 0
for cluster_id in cluster_ids:
# create zigpy device
device_infos[cluster_id] = {"zigpy_device": None}
device_infos[cluster_id]["zigpy_device"] = await async_init_zigpy_device(
hass,
[cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
ieee=f"00:15:8d:00:02:32:4f:0{counter}",
manufacturer=f"Fake{cluster_id}",
model=f"FakeModel{cluster_id}",
)
counter += 1
# load up sensor domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
# put the other relevant info in the device info dict
for cluster_id in cluster_ids:
device_info = device_infos[cluster_id]
zigpy_device = device_info["zigpy_device"]
device_info["cluster"] = zigpy_device.endpoints.get(1).in_clusters[cluster_id]
zha_device = zha_gateway.get_device(zigpy_device.ieee)
device_info["zha_device"] = zha_device
device_info[ATTR_ENTITY_ID] = await find_entity_id(DOMAIN, zha_device, hass)
await hass.async_block_till_done()
return device_infos
async def async_test_humidity(hass, device_info):
async def async_test_humidity(hass, cluster, entity_id):
"""Test humidity sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 1000)
assert_state(hass, device_info, "10.0", "%")
await send_attribute_report(hass, cluster, 0, 1000)
assert_state(hass, entity_id, "10.0", "%")
async def async_test_temperature(hass, device_info):
async def async_test_temperature(hass, cluster, entity_id):
"""Test temperature sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 2900)
assert_state(hass, device_info, "29.0", "°C")
await send_attribute_report(hass, cluster, 0, 2900)
assert_state(hass, entity_id, "29.0", "°C")
async def async_test_pressure(hass, device_info):
async def async_test_pressure(hass, cluster, entity_id):
"""Test pressure sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 1000)
assert_state(hass, device_info, "1000", "hPa")
await send_attribute_report(hass, cluster, 0, 1000)
assert_state(hass, entity_id, "1000", "hPa")
async def async_test_illuminance(hass, device_info):
async def async_test_illuminance(hass, cluster, entity_id):
"""Test illuminance sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 10)
assert_state(hass, device_info, "1.0", "lx")
await send_attribute_report(hass, cluster, 0, 10)
assert_state(hass, entity_id, "1.0", "lx")
async def async_test_metering(hass, device_info):
async def async_test_metering(hass, cluster, entity_id):
"""Test metering sensor."""
await send_attribute_report(hass, device_info["cluster"], 1024, 12345)
assert_state(hass, device_info, "12345.0", "unknown")
await send_attribute_report(hass, cluster, 1024, 12345)
assert_state(hass, entity_id, "12345.0", "unknown")
async def async_test_electrical_measurement(hass, device_info):
async def async_test_electrical_measurement(hass, cluster, entity_id):
"""Test electrical measurement sensor."""
with mock.patch(
(
@ -189,18 +71,81 @@ async def async_test_electrical_measurement(hass, device_info):
new_callable=mock.PropertyMock,
) as divisor_mock:
divisor_mock.return_value = 1
await send_attribute_report(hass, device_info["cluster"], 1291, 100)
assert_state(hass, device_info, "100", "W")
await send_attribute_report(hass, cluster, 1291, 100)
assert_state(hass, entity_id, "100", "W")
await send_attribute_report(hass, device_info["cluster"], 1291, 99)
assert_state(hass, device_info, "99", "W")
await send_attribute_report(hass, cluster, 1291, 99)
assert_state(hass, entity_id, "99", "W")
divisor_mock.return_value = 10
await send_attribute_report(hass, device_info["cluster"], 1291, 1000)
assert_state(hass, device_info, "100", "W")
await send_attribute_report(hass, cluster, 1291, 1000)
assert_state(hass, entity_id, "100", "W")
await send_attribute_report(hass, device_info["cluster"], 1291, 99)
assert_state(hass, device_info, "9.9", "W")
await send_attribute_report(hass, cluster, 1291, 99)
assert_state(hass, entity_id, "9.9", "W")
@pytest.mark.parametrize(
"cluster_id, test_func, report_count",
(
(measurement.RelativeHumidity.cluster_id, async_test_humidity, 1),
(measurement.TemperatureMeasurement.cluster_id, async_test_temperature, 1),
(measurement.PressureMeasurement.cluster_id, async_test_pressure, 1),
(measurement.IlluminanceMeasurement.cluster_id, async_test_illuminance, 1),
(smartenergy.Metering.cluster_id, async_test_metering, 1),
(
homeautomation.ElectricalMeasurement.cluster_id,
async_test_electrical_measurement,
1,
),
),
)
async def test_sensor(
hass,
zha_gateway,
zigpy_device_mock,
zha_device_joined_restored,
cluster_id,
test_func,
report_count,
):
"""Test zha sensor platform."""
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [cluster_id, general.Basic.cluster_id],
"out_cluster": [],
"device_type": 0x0000,
}
}
)
cluster = zigpy_device.endpoints[1].in_clusters[cluster_id]
zha_device = await zha_device_joined_restored(zigpy_device)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
# ensure the sensor entity was created
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and devices
await async_enable_traffic(hass, zha_gateway, [zha_device])
# test that the sensor now have a state of unknown
assert hass.states.get(entity_id).state == STATE_UNKNOWN
# test sensor associated logic
await test_func(hass, cluster, entity_id)
# test rejoin
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
await test_func(hass, cluster, entity_id)
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == report_count
assert cluster.configure_reporting.await_count == report_count
async def send_attribute_report(hass, cluster, attrid, value):
@ -215,13 +160,13 @@ async def send_attribute_report(hass, cluster, attrid, value):
await hass.async_block_till_done()
def assert_state(hass, device_info, state, unit_of_measurement):
def assert_state(hass, entity_id, state, unit_of_measurement):
"""Check that the state is what is expected.
This is used to ensure that the logic in each sensor class handled the
attribute report it received correctly.
"""
hass_state = hass.states.get(device_info[ATTR_ENTITY_ID])
hass_state = hass.states.get(entity_id)
assert hass_state.state == state
assert hass_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) == unit_of_measurement
@ -282,7 +227,15 @@ def core_rs(hass_storage):
],
)
async def test_temp_uom(
uom, raw_temp, expected, restore, hass_ms, config_entry, zha_gateway, core_rs
uom,
raw_temp,
expected,
restore,
hass_ms,
zha_gateway,
core_rs,
zigpy_device_mock,
zha_device_restored,
):
"""Test zha temperature sensor unit of measurement."""
@ -294,17 +247,22 @@ async def test_temp_uom(
CONF_UNIT_SYSTEM_METRIC if uom == TEMP_CELSIUS else CONF_UNIT_SYSTEM_IMPERIAL
)
# list of cluster ids to create devices and sensor entities for
temp_cluster = measurement.TemperatureMeasurement
cluster_ids = [temp_cluster.cluster_id]
# devices that were created from cluster_ids list above
zigpy_device_infos = await async_build_devices(
hass, zha_gateway, config_entry, cluster_ids
zigpy_device = zigpy_device_mock(
{
1: {
"in_clusters": [
measurement.TemperatureMeasurement.cluster_id,
general.Basic.cluster_id,
],
"out_cluster": [],
"device_type": 0x0000,
}
}
)
cluster = zigpy_device.endpoints[1].temperature
zha_device = await zha_device_restored(zigpy_device)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
zigpy_device_info = zigpy_device_infos[temp_cluster.cluster_id]
zha_device = zigpy_device_info["zha_device"]
if not restore:
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
@ -315,7 +273,7 @@ async def test_temp_uom(
if not restore:
assert hass.states.get(entity_id).state == STATE_UNKNOWN
await send_attribute_report(hass, zigpy_device_info["cluster"], 0, raw_temp)
await send_attribute_report(hass, cluster, 0, raw_temp)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state is not None

View File

@ -1,6 +1,7 @@
"""Test zha switch."""
from unittest.mock import call, patch
import pytest
import zigpy.zcl.clusters.general as general
import zigpy.zcl.foundation as zcl_f
@ -9,8 +10,6 @@ from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from .common import (
async_enable_traffic,
async_init_zigpy_device,
async_test_device_join,
find_entity_id,
make_attribute,
make_zcl_header,
@ -22,24 +21,24 @@ ON = 1
OFF = 0
async def test_switch(hass, config_entry, zha_gateway):
@pytest.fixture
def zigpy_device(zigpy_device_mock):
"""Device tracker zigpy device."""
endpoints = {
1: {
"in_clusters": [general.Basic.cluster_id, general.OnOff.cluster_id],
"out_clusters": [],
"device_type": 0,
}
}
return zigpy_device_mock(endpoints)
async def test_switch(hass, zha_gateway, zha_device_joined_restored, zigpy_device):
"""Test zha switch platform."""
# create zigpy device
zigpy_device = await async_init_zigpy_device(
hass,
[general.OnOff.cluster_id, general.Basic.cluster_id],
[],
None,
zha_gateway,
)
# load up switch domain
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
await hass.async_block_till_done()
zha_device = await zha_device_joined_restored(zigpy_device)
cluster = zigpy_device.endpoints.get(1).on_off
zha_device = zha_gateway.get_device(zigpy_device.ieee)
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
assert entity_id is not None
@ -94,4 +93,11 @@ async def test_switch(hass, config_entry, zha_gateway):
)
# test joining a new switch to the network and HA
await async_test_device_join(hass, zha_gateway, general.OnOff.cluster_id, entity_id)
cluster.bind.reset_mock()
cluster.configure_reporting.reset_mock()
await zha_gateway.async_device_initialized(zigpy_device)
await hass.async_block_till_done()
assert cluster.bind.call_count == 1
assert cluster.bind.await_count == 1
assert cluster.configure_reporting.call_count == 1
assert cluster.configure_reporting.await_count == 1