diff --git a/tests/components/zha/common.py b/tests/components/zha/common.py index ea0e5f43467..c7a9c786054 100644 --- a/tests/components/zha/common.py +++ b/tests/components/zha/common.py @@ -1,11 +1,13 @@ """Common test objects.""" import time -from unittest.mock import Mock +from unittest.mock import patch, Mock +from homeassistant.const import STATE_UNKNOWN from homeassistant.components.zha.core.helpers import convert_ieee from homeassistant.components.zha.core.const import ( DATA_ZHA, DATA_ZHA_CONFIG, DATA_ZHA_DISPATCHERS, DATA_ZHA_BRIDGE_ID ) from homeassistant.util import slugify +from tests.common import mock_coro class FakeApplication: @@ -23,7 +25,7 @@ APPLICATION = FakeApplication() class FakeEndpoint: """Fake endpoint for moking zigpy.""" - def __init__(self): + def __init__(self, manufacturer, model): """Init fake endpoint.""" from zigpy.profiles.zha import PROFILE_ID self.device = None @@ -32,8 +34,8 @@ class FakeEndpoint: self.out_clusters = {} self._cluster_attr = {} self.status = 1 - self.manufacturer = 'FakeManufacturer' - self.model = 'FakeModel' + self.manufacturer = manufacturer + self.model = model self.profile_id = PROFILE_ID self.device_type = None @@ -61,19 +63,16 @@ def patch_cluster(cluster): cluster.handle_cluster_general_request = Mock() cluster.read_attributes_raw = Mock() cluster.read_attributes = Mock() - cluster.write_attributes = Mock() - cluster.bind = Mock() cluster.unbind = Mock() - cluster.configure_reporting = Mock() class FakeDevice: """Fake device for mocking zigpy.""" - def __init__(self): + def __init__(self, ieee, manufacturer, model): """Init fake device.""" self._application = APPLICATION - self.ieee = convert_ieee("00:0d:6f:00:0a:90:69:e7") + self.ieee = convert_ieee(ieee) self.nwk = 0xb79c self.zdo = Mock() self.endpoints = {0: self.zdo} @@ -82,14 +81,15 @@ class FakeDevice: self.last_seen = time.time() self.status = 2 self.initializing = False - self.manufacturer = 'FakeManufacturer' - self.model = 'FakeModel' + self.manufacturer = manufacturer + self.model = model -def make_device(in_cluster_ids, out_cluster_ids, device_type): +def make_device(in_cluster_ids, out_cluster_ids, device_type, ieee, + manufacturer, model): """Make a fake device using the specified cluster classes.""" - device = FakeDevice() - endpoint = FakeEndpoint() + device = FakeDevice(ieee, manufacturer, model) + endpoint = FakeEndpoint(manufacturer, model) endpoint.device = device device.endpoints[endpoint.endpoint_id] = endpoint endpoint.device_type = device_type @@ -104,10 +104,20 @@ def make_device(in_cluster_ids, out_cluster_ids, device_type): async def async_init_zigpy_device( - hass, in_cluster_ids, out_cluster_ids, device_type, gateway): - """Create and initialize a device.""" - device = make_device(in_cluster_ids, out_cluster_ids, device_type) - await gateway.async_device_initialized(device, False) + hass, in_cluster_ids, out_cluster_ids, device_type, gateway, + ieee="00:0d:6f:00:0a:90:69:e7", manufacturer="FakeManufacturer", + model="FakeModel", is_new_join=False): + """Create and initialize a device. + + This creates a fake device and adds it to the "network". It can be used to + test existing device functionality and new device pairing functionality. + The is_new_join parameter influences whether or not the device will go + through cluster binding and zigbee cluster configure reporting. That only + happens when the device is paired to the network for the first time. + """ + device = make_device(in_cluster_ids, out_cluster_ids, device_type, ieee, + manufacturer, model) + await gateway.async_device_initialized(device, is_new_join) await hass.async_block_till_done() return device @@ -130,8 +140,12 @@ async def async_setup_entry(hass, config_entry): return True -def make_entity_id(domain, device, cluster): - """Make the entity id for the entity under testing.""" +def make_entity_id(domain, device, cluster, use_suffix=True): + """Make the entity id for the entity under testing. + + This is used to get the entity id in order to get the state from the state + machine so that we can test state changes. + """ ieee = device.ieee ieeetail = ''.join(['%02x' % (o, ) for o in ieee[-4:]]) entity_id = "{}.{}_{}_{}_{}{}".format( @@ -140,13 +154,43 @@ def make_entity_id(domain, device, cluster): slugify(device.model), ieeetail, cluster.endpoint.endpoint_id, - "_{}".format(cluster.cluster_id), + ("", "_{}".format(cluster.cluster_id))[use_suffix], ) return entity_id -async def async_enable_traffic(hass, zha_gateway, zha_device): +async def async_enable_traffic(hass, zha_gateway, zha_devices): """Allow traffic to flow through the gateway and the zha device.""" await zha_gateway.accept_zigbee_messages({}) - zha_device.update_available(True) + for zha_device in zha_devices: + zha_device.update_available(True) await hass.async_block_till_done() + + +async def async_test_device_join( + hass, zha_gateway, cluster_id, domain, device_type=None, + expected_state=STATE_UNKNOWN): + """Test a newly joining device. + + This creates a new fake device and adds it to the network. It is meant to + simulate pairing a new device to the network so that code pathways that + only trigger during device joins can be tested. + """ + from zigpy.zcl.foundation import Status + # create zigpy device mocking out the zigbee network operations + with patch( + 'zigpy.zcl.Cluster.configure_reporting', + return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])): + with patch( + 'zigpy.zcl.Cluster.bind', + return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])): + zigpy_device = await async_init_zigpy_device( + hass, [cluster_id], [], device_type, zha_gateway, + ieee="00:0d:6f:00:0a:90:69:f7", + manufacturer="FakeMan{}".format(cluster_id), + model="FakeMod{}".format(cluster_id), + is_new_join=True) + cluster = zigpy_device.endpoints.get(1).in_clusters[cluster_id] + entity_id = make_entity_id( + domain, zigpy_device, cluster, use_suffix=device_type is None) + assert hass.states.get(entity_id).state == expected_state diff --git a/tests/components/zha/conftest.py b/tests/components/zha/conftest.py index 59509e2ab03..624c6a02964 100644 --- a/tests/components/zha/conftest.py +++ b/tests/components/zha/conftest.py @@ -20,13 +20,22 @@ def config_entry_fixture(hass): @pytest.fixture(name='zha_gateway') def zha_gateway_fixture(hass): - """Fixture representing a zha gateway.""" + """Fixture representing a zha gateway. + + Create a ZHAGateway object that can be used to interact with as if we + had a real zigbee network running. + """ return ZHAGateway(hass, {}) @pytest.fixture(autouse=True) async def setup_zha(hass, config_entry): - """Load the ZHA component.""" + """Load the ZHA component. + + This will init the ZHA component. It loads the component in HA so that + we can test the domains that ZHA supports without actually having a zigbee + network running. + """ # this prevents needing an actual radio and zigbee network available with patch('homeassistant.components.zha.async_setup_entry', async_setup_entry): diff --git a/tests/components/zha/test_sensor.py b/tests/components/zha/test_sensor.py new file mode 100644 index 00000000000..3933f416e3d --- /dev/null +++ b/tests/components/zha/test_sensor.py @@ -0,0 +1,164 @@ +"""Test zha sensor.""" +from homeassistant.components.sensor import DOMAIN +from homeassistant.const import STATE_UNKNOWN +from .common import ( + async_init_zigpy_device, make_attribute, make_entity_id, + async_test_device_join +) + + +async def test_sensor(hass, config_entry, zha_gateway): + """Test zha sensor platform.""" + from zigpy.zcl.clusters.measurement import ( + RelativeHumidity, TemperatureMeasurement, PressureMeasurement, + IlluminanceMeasurement + ) + from zigpy.zcl.clusters.smartenergy import Metering + from zigpy.zcl.clusters.homeautomation import ElectricalMeasurement + + # list of cluster ids to create devices and sensor entities for + cluster_ids = [ + RelativeHumidity.cluster_id, + TemperatureMeasurement.cluster_id, + PressureMeasurement.cluster_id, + IlluminanceMeasurement.cluster_id, + Metering.cluster_id, + ElectricalMeasurement.cluster_id + ] + + # devices that were created from cluster_ids list above + zigpy_device_infos = await async_build_devices( + hass, zha_gateway, config_entry, cluster_ids) + + # ensure the sensor entity was created for each id in cluster_ids + for cluster_id in cluster_ids: + zigpy_device_info = zigpy_device_infos[cluster_id] + entity_id = zigpy_device_info["entity_id"] + assert hass.states.get(entity_id).state == STATE_UNKNOWN + + # get the humidity device info and test the associated sensor logic + device_info = zigpy_device_infos[RelativeHumidity.cluster_id] + await async_test_humidity(hass, device_info) + + # get the temperature device info and test the associated sensor logic + device_info = zigpy_device_infos[TemperatureMeasurement.cluster_id] + await async_test_temperature(hass, device_info) + + # get the pressure device info and test the associated sensor logic + device_info = zigpy_device_infos[PressureMeasurement.cluster_id] + await async_test_pressure(hass, device_info) + + # get the illuminance device info and test the associated sensor logic + device_info = zigpy_device_infos[IlluminanceMeasurement.cluster_id] + await async_test_illuminance(hass, device_info) + + # get the metering device info and test the associated sensor logic + device_info = zigpy_device_infos[Metering.cluster_id] + await async_test_metering(hass, device_info) + + # get the electrical_measurement device info and test the associated + # sensor logic + device_info = zigpy_device_infos[ElectricalMeasurement.cluster_id] + await async_test_electrical_measurement(hass, device_info) + + # test joining a new temperature sensor to the network + await async_test_device_join( + hass, zha_gateway, TemperatureMeasurement.cluster_id, DOMAIN) + + +async def async_build_devices(hass, zha_gateway, config_entry, cluster_ids): + """Build a zigpy device for each cluster id. + + This will build devices for all cluster ids that exist in cluster_ids. + They get added to the network and then the sensor component is loaded + which will cause sensor entites to get created for each device. + A dict containing relevant device info for testing is returned. It contains + the entity id, zigpy device, and the zigbee cluster for the sensor. + """ + device_infos = {} + counter = 0 + for cluster_id in cluster_ids: + # create zigpy device + device_infos[cluster_id] = {"zigpy_device": None} + device_infos[cluster_id]["zigpy_device"] = await \ + async_init_zigpy_device( + hass, [cluster_id], [], None, zha_gateway, + ieee="{}0:15:8d:00:02:32:4f:32".format(counter), + manufacturer="Fake{}".format(cluster_id), + model="FakeModel{}".format(cluster_id)) + + counter += 1 + + # load up sensor domain + await hass.config_entries.async_forward_entry_setup( + config_entry, DOMAIN) + await hass.async_block_till_done() + + # put the other relevant info in the device info dict + for cluster_id in cluster_ids: + device_info = device_infos[cluster_id] + zigpy_device = device_info["zigpy_device"] + device_info["cluster"] = zigpy_device.endpoints.get( + 1).in_clusters[cluster_id] + device_info["entity_id"] = make_entity_id( + DOMAIN, zigpy_device, device_info["cluster"]) + return device_infos + + +async def async_test_humidity(hass, device_info): + """Test humidity sensor.""" + await send_attribute_report(hass, device_info["cluster"], 0, 1000) + assert_state(hass, device_info, '10.0', '%') + + +async def async_test_temperature(hass, device_info): + """Test temperature sensor.""" + await send_attribute_report(hass, device_info["cluster"], 0, 2900) + assert_state(hass, device_info, '29.0', '°C') + + +async def async_test_pressure(hass, device_info): + """Test pressure sensor.""" + await send_attribute_report(hass, device_info["cluster"], 0, 1000) + assert_state(hass, device_info, '1000', 'hPa') + + +async def async_test_illuminance(hass, device_info): + """Test illuminance sensor.""" + await send_attribute_report(hass, device_info["cluster"], 0, 10) + assert_state(hass, device_info, '10', 'lx') + + +async def async_test_metering(hass, device_info): + """Test metering sensor.""" + await send_attribute_report(hass, device_info["cluster"], 1024, 10) + assert_state(hass, device_info, '10', 'W') + + +async def async_test_electrical_measurement(hass, device_info): + """Test electrical measurement sensor.""" + await send_attribute_report(hass, device_info["cluster"], 1291, 100) + assert_state(hass, device_info, '10.0', 'W') + + +async def send_attribute_report(hass, cluster, attrid, value): + """Cause the sensor to receive an attribute report from the network. + + This is to simulate the normal device communication that happens when a + device is paired to the zigbee network. + """ + attr = make_attribute(attrid, value) + cluster.handle_message(False, 1, 0x0a, [[attr]]) + await hass.async_block_till_done() + + +def assert_state(hass, device_info, state, unit_of_measurement): + """Check that the state is what is expected. + + This is used to ensure that the logic in each sensor class handled the + attribute report it received correctly. + """ + hass_state = hass.states.get(device_info["entity_id"]) + assert hass_state.state == state + assert hass_state.attributes.get('unit_of_measurement') == \ + unit_of_measurement diff --git a/tests/components/zha/test_switch.py b/tests/components/zha/test_switch.py index e86eb7fdd9b..d3415bde59b 100644 --- a/tests/components/zha/test_switch.py +++ b/tests/components/zha/test_switch.py @@ -4,7 +4,8 @@ from homeassistant.components.switch import DOMAIN from homeassistant.const import STATE_ON, STATE_OFF from tests.common import mock_coro from .common import ( - async_init_zigpy_device, make_attribute, make_entity_id + async_init_zigpy_device, make_attribute, make_entity_id, + async_test_device_join ) ON = 1 @@ -64,3 +65,6 @@ async def test_switch(hass, config_entry, zha_gateway): assert len(cluster.request.mock_calls) == 1 assert cluster.request.call_args == call( False, OFF, (), expect_reply=True, manufacturer=None) + + await async_test_device_join( + hass, zha_gateway, OnOff.cluster_id, DOMAIN, expected_state=STATE_OFF)