Add ZHA sensor tests (#20710)

* add sensor tests

* update switch test

* add sensor back to coveragerc

* review comments

* added comments
This commit is contained in:
David F. Mulcahey 2019-02-03 16:03:35 -05:00 committed by Martin Hjelmare
parent 74cdf7c347
commit 9c11602674
4 changed files with 247 additions and 26 deletions

View File

@ -1,11 +1,13 @@
"""Common test objects."""
import time
from unittest.mock import Mock
from unittest.mock import patch, Mock
from homeassistant.const import STATE_UNKNOWN
from homeassistant.components.zha.core.helpers import convert_ieee
from homeassistant.components.zha.core.const import (
DATA_ZHA, DATA_ZHA_CONFIG, DATA_ZHA_DISPATCHERS, DATA_ZHA_BRIDGE_ID
)
from homeassistant.util import slugify
from tests.common import mock_coro
class FakeApplication:
@ -23,7 +25,7 @@ APPLICATION = FakeApplication()
class FakeEndpoint:
"""Fake endpoint for moking zigpy."""
def __init__(self):
def __init__(self, manufacturer, model):
"""Init fake endpoint."""
from zigpy.profiles.zha import PROFILE_ID
self.device = None
@ -32,8 +34,8 @@ class FakeEndpoint:
self.out_clusters = {}
self._cluster_attr = {}
self.status = 1
self.manufacturer = 'FakeManufacturer'
self.model = 'FakeModel'
self.manufacturer = manufacturer
self.model = model
self.profile_id = PROFILE_ID
self.device_type = None
@ -61,19 +63,16 @@ def patch_cluster(cluster):
cluster.handle_cluster_general_request = Mock()
cluster.read_attributes_raw = Mock()
cluster.read_attributes = Mock()
cluster.write_attributes = Mock()
cluster.bind = Mock()
cluster.unbind = Mock()
cluster.configure_reporting = Mock()
class FakeDevice:
"""Fake device for mocking zigpy."""
def __init__(self):
def __init__(self, ieee, manufacturer, model):
"""Init fake device."""
self._application = APPLICATION
self.ieee = convert_ieee("00:0d:6f:00:0a:90:69:e7")
self.ieee = convert_ieee(ieee)
self.nwk = 0xb79c
self.zdo = Mock()
self.endpoints = {0: self.zdo}
@ -82,14 +81,15 @@ class FakeDevice:
self.last_seen = time.time()
self.status = 2
self.initializing = False
self.manufacturer = 'FakeManufacturer'
self.model = 'FakeModel'
self.manufacturer = manufacturer
self.model = model
def make_device(in_cluster_ids, out_cluster_ids, device_type):
def make_device(in_cluster_ids, out_cluster_ids, device_type, ieee,
manufacturer, model):
"""Make a fake device using the specified cluster classes."""
device = FakeDevice()
endpoint = FakeEndpoint()
device = FakeDevice(ieee, manufacturer, model)
endpoint = FakeEndpoint(manufacturer, model)
endpoint.device = device
device.endpoints[endpoint.endpoint_id] = endpoint
endpoint.device_type = device_type
@ -104,10 +104,20 @@ def make_device(in_cluster_ids, out_cluster_ids, device_type):
async def async_init_zigpy_device(
hass, in_cluster_ids, out_cluster_ids, device_type, gateway):
"""Create and initialize a device."""
device = make_device(in_cluster_ids, out_cluster_ids, device_type)
await gateway.async_device_initialized(device, False)
hass, in_cluster_ids, out_cluster_ids, device_type, gateway,
ieee="00:0d:6f:00:0a:90:69:e7", manufacturer="FakeManufacturer",
model="FakeModel", is_new_join=False):
"""Create and initialize a device.
This creates a fake device and adds it to the "network". It can be used to
test existing device functionality and new device pairing functionality.
The is_new_join parameter influences whether or not the device will go
through cluster binding and zigbee cluster configure reporting. That only
happens when the device is paired to the network for the first time.
"""
device = make_device(in_cluster_ids, out_cluster_ids, device_type, ieee,
manufacturer, model)
await gateway.async_device_initialized(device, is_new_join)
await hass.async_block_till_done()
return device
@ -130,8 +140,12 @@ async def async_setup_entry(hass, config_entry):
return True
def make_entity_id(domain, device, cluster):
"""Make the entity id for the entity under testing."""
def make_entity_id(domain, device, cluster, use_suffix=True):
"""Make the entity id for the entity under testing.
This is used to get the entity id in order to get the state from the state
machine so that we can test state changes.
"""
ieee = device.ieee
ieeetail = ''.join(['%02x' % (o, ) for o in ieee[-4:]])
entity_id = "{}.{}_{}_{}_{}{}".format(
@ -140,13 +154,43 @@ def make_entity_id(domain, device, cluster):
slugify(device.model),
ieeetail,
cluster.endpoint.endpoint_id,
"_{}".format(cluster.cluster_id),
("", "_{}".format(cluster.cluster_id))[use_suffix],
)
return entity_id
async def async_enable_traffic(hass, zha_gateway, zha_device):
async def async_enable_traffic(hass, zha_gateway, zha_devices):
"""Allow traffic to flow through the gateway and the zha device."""
await zha_gateway.accept_zigbee_messages({})
zha_device.update_available(True)
for zha_device in zha_devices:
zha_device.update_available(True)
await hass.async_block_till_done()
async def async_test_device_join(
hass, zha_gateway, cluster_id, domain, device_type=None,
expected_state=STATE_UNKNOWN):
"""Test a newly joining device.
This creates a new fake device and adds it to the network. It is meant to
simulate pairing a new device to the network so that code pathways that
only trigger during device joins can be tested.
"""
from zigpy.zcl.foundation import Status
# create zigpy device mocking out the zigbee network operations
with patch(
'zigpy.zcl.Cluster.configure_reporting',
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
with patch(
'zigpy.zcl.Cluster.bind',
return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])):
zigpy_device = await async_init_zigpy_device(
hass, [cluster_id], [], device_type, zha_gateway,
ieee="00:0d:6f:00:0a:90:69:f7",
manufacturer="FakeMan{}".format(cluster_id),
model="FakeMod{}".format(cluster_id),
is_new_join=True)
cluster = zigpy_device.endpoints.get(1).in_clusters[cluster_id]
entity_id = make_entity_id(
domain, zigpy_device, cluster, use_suffix=device_type is None)
assert hass.states.get(entity_id).state == expected_state

View File

@ -20,13 +20,22 @@ def config_entry_fixture(hass):
@pytest.fixture(name='zha_gateway')
def zha_gateway_fixture(hass):
"""Fixture representing a zha gateway."""
"""Fixture representing a zha gateway.
Create a ZHAGateway object that can be used to interact with as if we
had a real zigbee network running.
"""
return ZHAGateway(hass, {})
@pytest.fixture(autouse=True)
async def setup_zha(hass, config_entry):
"""Load the ZHA component."""
"""Load the ZHA component.
This will init the ZHA component. It loads the component in HA so that
we can test the domains that ZHA supports without actually having a zigbee
network running.
"""
# this prevents needing an actual radio and zigbee network available
with patch('homeassistant.components.zha.async_setup_entry',
async_setup_entry):

View File

@ -0,0 +1,164 @@
"""Test zha sensor."""
from homeassistant.components.sensor import DOMAIN
from homeassistant.const import STATE_UNKNOWN
from .common import (
async_init_zigpy_device, make_attribute, make_entity_id,
async_test_device_join
)
async def test_sensor(hass, config_entry, zha_gateway):
"""Test zha sensor platform."""
from zigpy.zcl.clusters.measurement import (
RelativeHumidity, TemperatureMeasurement, PressureMeasurement,
IlluminanceMeasurement
)
from zigpy.zcl.clusters.smartenergy import Metering
from zigpy.zcl.clusters.homeautomation import ElectricalMeasurement
# list of cluster ids to create devices and sensor entities for
cluster_ids = [
RelativeHumidity.cluster_id,
TemperatureMeasurement.cluster_id,
PressureMeasurement.cluster_id,
IlluminanceMeasurement.cluster_id,
Metering.cluster_id,
ElectricalMeasurement.cluster_id
]
# devices that were created from cluster_ids list above
zigpy_device_infos = await async_build_devices(
hass, zha_gateway, config_entry, cluster_ids)
# ensure the sensor entity was created for each id in cluster_ids
for cluster_id in cluster_ids:
zigpy_device_info = zigpy_device_infos[cluster_id]
entity_id = zigpy_device_info["entity_id"]
assert hass.states.get(entity_id).state == STATE_UNKNOWN
# get the humidity device info and test the associated sensor logic
device_info = zigpy_device_infos[RelativeHumidity.cluster_id]
await async_test_humidity(hass, device_info)
# get the temperature device info and test the associated sensor logic
device_info = zigpy_device_infos[TemperatureMeasurement.cluster_id]
await async_test_temperature(hass, device_info)
# get the pressure device info and test the associated sensor logic
device_info = zigpy_device_infos[PressureMeasurement.cluster_id]
await async_test_pressure(hass, device_info)
# get the illuminance device info and test the associated sensor logic
device_info = zigpy_device_infos[IlluminanceMeasurement.cluster_id]
await async_test_illuminance(hass, device_info)
# get the metering device info and test the associated sensor logic
device_info = zigpy_device_infos[Metering.cluster_id]
await async_test_metering(hass, device_info)
# get the electrical_measurement device info and test the associated
# sensor logic
device_info = zigpy_device_infos[ElectricalMeasurement.cluster_id]
await async_test_electrical_measurement(hass, device_info)
# test joining a new temperature sensor to the network
await async_test_device_join(
hass, zha_gateway, TemperatureMeasurement.cluster_id, DOMAIN)
async def async_build_devices(hass, zha_gateway, config_entry, cluster_ids):
"""Build a zigpy device for each cluster id.
This will build devices for all cluster ids that exist in cluster_ids.
They get added to the network and then the sensor component is loaded
which will cause sensor entites to get created for each device.
A dict containing relevant device info for testing is returned. It contains
the entity id, zigpy device, and the zigbee cluster for the sensor.
"""
device_infos = {}
counter = 0
for cluster_id in cluster_ids:
# create zigpy device
device_infos[cluster_id] = {"zigpy_device": None}
device_infos[cluster_id]["zigpy_device"] = await \
async_init_zigpy_device(
hass, [cluster_id], [], None, zha_gateway,
ieee="{}0:15:8d:00:02:32:4f:32".format(counter),
manufacturer="Fake{}".format(cluster_id),
model="FakeModel{}".format(cluster_id))
counter += 1
# load up sensor domain
await hass.config_entries.async_forward_entry_setup(
config_entry, DOMAIN)
await hass.async_block_till_done()
# put the other relevant info in the device info dict
for cluster_id in cluster_ids:
device_info = device_infos[cluster_id]
zigpy_device = device_info["zigpy_device"]
device_info["cluster"] = zigpy_device.endpoints.get(
1).in_clusters[cluster_id]
device_info["entity_id"] = make_entity_id(
DOMAIN, zigpy_device, device_info["cluster"])
return device_infos
async def async_test_humidity(hass, device_info):
"""Test humidity sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 1000)
assert_state(hass, device_info, '10.0', '%')
async def async_test_temperature(hass, device_info):
"""Test temperature sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 2900)
assert_state(hass, device_info, '29.0', '°C')
async def async_test_pressure(hass, device_info):
"""Test pressure sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 1000)
assert_state(hass, device_info, '1000', 'hPa')
async def async_test_illuminance(hass, device_info):
"""Test illuminance sensor."""
await send_attribute_report(hass, device_info["cluster"], 0, 10)
assert_state(hass, device_info, '10', 'lx')
async def async_test_metering(hass, device_info):
"""Test metering sensor."""
await send_attribute_report(hass, device_info["cluster"], 1024, 10)
assert_state(hass, device_info, '10', 'W')
async def async_test_electrical_measurement(hass, device_info):
"""Test electrical measurement sensor."""
await send_attribute_report(hass, device_info["cluster"], 1291, 100)
assert_state(hass, device_info, '10.0', 'W')
async def send_attribute_report(hass, cluster, attrid, value):
"""Cause the sensor to receive an attribute report from the network.
This is to simulate the normal device communication that happens when a
device is paired to the zigbee network.
"""
attr = make_attribute(attrid, value)
cluster.handle_message(False, 1, 0x0a, [[attr]])
await hass.async_block_till_done()
def assert_state(hass, device_info, state, unit_of_measurement):
"""Check that the state is what is expected.
This is used to ensure that the logic in each sensor class handled the
attribute report it received correctly.
"""
hass_state = hass.states.get(device_info["entity_id"])
assert hass_state.state == state
assert hass_state.attributes.get('unit_of_measurement') == \
unit_of_measurement

View File

@ -4,7 +4,8 @@ from homeassistant.components.switch import DOMAIN
from homeassistant.const import STATE_ON, STATE_OFF
from tests.common import mock_coro
from .common import (
async_init_zigpy_device, make_attribute, make_entity_id
async_init_zigpy_device, make_attribute, make_entity_id,
async_test_device_join
)
ON = 1
@ -64,3 +65,6 @@ async def test_switch(hass, config_entry, zha_gateway):
assert len(cluster.request.mock_calls) == 1
assert cluster.request.call_args == call(
False, OFF, (), expect_reply=True, manufacturer=None)
await async_test_device_join(
hass, zha_gateway, OnOff.cluster_id, DOMAIN, expected_state=STATE_OFF)