diff --git a/requirements_test_all.txt b/requirements_test_all.txt index c46596e209e..5feb4165155 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -49,6 +49,9 @@ aiounifi==4 # homeassistant.components.notify.apns apns2==0.3.0 +# homeassistant.components.zha +bellows==0.7.0 + # homeassistant.components.calendar.caldav caldav==0.5.0 @@ -298,3 +301,6 @@ wakeonlan==1.1.6 # homeassistant.components.cloud warrant==0.6.1 + +# homeassistant.components.zha +zigpy==0.2.0 diff --git a/script/gen_requirements_all.py b/script/gen_requirements_all.py index 398b2791848..4a99ef84bc9 100755 --- a/script/gen_requirements_all.py +++ b/script/gen_requirements_all.py @@ -124,6 +124,8 @@ TEST_REQUIREMENTS = ( 'vultr', 'YesssSMS', 'ruamel.yaml', + 'zigpy', + 'bellows', ) IGNORE_PACKAGES = ( diff --git a/tests/components/zha/common.py b/tests/components/zha/common.py new file mode 100644 index 00000000000..ea0e5f43467 --- /dev/null +++ b/tests/components/zha/common.py @@ -0,0 +1,152 @@ +"""Common test objects.""" +import time +from unittest.mock import Mock +from homeassistant.components.zha.core.helpers import convert_ieee +from homeassistant.components.zha.core.const import ( + DATA_ZHA, DATA_ZHA_CONFIG, DATA_ZHA_DISPATCHERS, DATA_ZHA_BRIDGE_ID +) +from homeassistant.util import slugify + + +class FakeApplication: + """Fake application for mocking zigpy.""" + + def __init__(self): + """Init fake application.""" + self.ieee = convert_ieee("00:15:8d:00:02:32:4f:32") + self.nwk = 0x087d + + +APPLICATION = FakeApplication() + + +class FakeEndpoint: + """Fake endpoint for moking zigpy.""" + + def __init__(self): + """Init fake endpoint.""" + from zigpy.profiles.zha import PROFILE_ID + self.device = None + self.endpoint_id = 1 + self.in_clusters = {} + self.out_clusters = {} + self._cluster_attr = {} + self.status = 1 + self.manufacturer = 'FakeManufacturer' + self.model = 'FakeModel' + self.profile_id = PROFILE_ID + self.device_type = None + + def add_input_cluster(self, cluster_id): + """Add an input cluster.""" + from zigpy.zcl import Cluster + cluster = Cluster.from_id(self, cluster_id) + patch_cluster(cluster) + self.in_clusters[cluster_id] = cluster + if hasattr(cluster, 'ep_attribute'): + setattr(self, cluster.ep_attribute, cluster) + + def add_output_cluster(self, cluster_id): + """Add an output cluster.""" + from zigpy.zcl import Cluster + cluster = Cluster.from_id(self, cluster_id) + patch_cluster(cluster) + self.out_clusters[cluster_id] = cluster + + +def patch_cluster(cluster): + """Patch a cluster for testing.""" + cluster.deserialize = Mock() + cluster.handle_cluster_request = Mock() + cluster.handle_cluster_general_request = Mock() + cluster.read_attributes_raw = Mock() + cluster.read_attributes = Mock() + cluster.write_attributes = Mock() + cluster.bind = Mock() + cluster.unbind = Mock() + cluster.configure_reporting = Mock() + + +class FakeDevice: + """Fake device for mocking zigpy.""" + + def __init__(self): + """Init fake device.""" + self._application = APPLICATION + self.ieee = convert_ieee("00:0d:6f:00:0a:90:69:e7") + self.nwk = 0xb79c + self.zdo = Mock() + self.endpoints = {0: self.zdo} + self.lqi = 255 + self.rssi = 8 + self.last_seen = time.time() + self.status = 2 + self.initializing = False + self.manufacturer = 'FakeManufacturer' + self.model = 'FakeModel' + + +def make_device(in_cluster_ids, out_cluster_ids, device_type): + """Make a fake device using the specified cluster classes.""" + device = FakeDevice() + endpoint = FakeEndpoint() + endpoint.device = device + device.endpoints[endpoint.endpoint_id] = endpoint + endpoint.device_type = device_type + + for cluster_id in in_cluster_ids: + endpoint.add_input_cluster(cluster_id) + + for cluster_id in out_cluster_ids: + endpoint.add_output_cluster(cluster_id) + + return device + + +async def async_init_zigpy_device( + hass, in_cluster_ids, out_cluster_ids, device_type, gateway): + """Create and initialize a device.""" + device = make_device(in_cluster_ids, out_cluster_ids, device_type) + await gateway.async_device_initialized(device, False) + await hass.async_block_till_done() + return device + + +def make_attribute(attrid, value, status=0): + """Make an attribute.""" + from zigpy.zcl.foundation import Attribute, TypeValue + attr = Attribute() + attr.attrid = attrid + attr.value = TypeValue() + attr.value.value = value + return attr + + +async def async_setup_entry(hass, config_entry): + """Mock setup entry for zha.""" + hass.data[DATA_ZHA][DATA_ZHA_CONFIG] = {} + hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS] = [] + hass.data[DATA_ZHA][DATA_ZHA_BRIDGE_ID] = APPLICATION.ieee + return True + + +def make_entity_id(domain, device, cluster): + """Make the entity id for the entity under testing.""" + ieee = device.ieee + ieeetail = ''.join(['%02x' % (o, ) for o in ieee[-4:]]) + entity_id = "{}.{}_{}_{}_{}{}".format( + domain, + slugify(device.manufacturer), + slugify(device.model), + ieeetail, + cluster.endpoint.endpoint_id, + "_{}".format(cluster.cluster_id), + ) + return entity_id + + +async def async_enable_traffic(hass, zha_gateway, zha_device): + """Allow traffic to flow through the gateway and the zha device.""" + await zha_gateway.accept_zigbee_messages({}) + zha_device.update_available(True) + await hass.async_block_till_done() diff --git a/tests/components/zha/conftest.py b/tests/components/zha/conftest.py new file mode 100644 index 00000000000..59509e2ab03 --- /dev/null +++ b/tests/components/zha/conftest.py @@ -0,0 +1,38 @@ +"""Test configuration for the ZHA component.""" +from unittest.mock import patch +import pytest +from homeassistant import config_entries +from homeassistant.components.zha.core.const import ( + DOMAIN, DATA_ZHA +) +from homeassistant.components.zha.core.gateway import ZHAGateway +from .common import async_setup_entry + + +@pytest.fixture(name='config_entry') +def config_entry_fixture(hass): + """Fixture representing a config entry.""" + config_entry = config_entries.ConfigEntry( + 1, DOMAIN, 'Mock Title', {}, 'test', + config_entries.CONN_CLASS_LOCAL_PUSH) + return config_entry + + +@pytest.fixture(name='zha_gateway') +def zha_gateway_fixture(hass): + """Fixture representing a zha gateway.""" + return ZHAGateway(hass, {}) + + +@pytest.fixture(autouse=True) +async def setup_zha(hass, config_entry): + """Load the ZHA component.""" + # this prevents needing an actual radio and zigbee network available + with patch('homeassistant.components.zha.async_setup_entry', + async_setup_entry): + hass.data[DATA_ZHA] = {} + + # init ZHA + await hass.config_entries.async_forward_entry_setup( + config_entry, DOMAIN) + await hass.async_block_till_done() diff --git a/tests/components/zha/test_switch.py b/tests/components/zha/test_switch.py new file mode 100644 index 00000000000..e86eb7fdd9b --- /dev/null +++ b/tests/components/zha/test_switch.py @@ -0,0 +1,66 @@ +"""Test zha switch.""" +from unittest.mock import call, patch +from homeassistant.components.switch import DOMAIN +from homeassistant.const import STATE_ON, STATE_OFF +from tests.common import mock_coro +from .common import ( + async_init_zigpy_device, make_attribute, make_entity_id +) + +ON = 1 +OFF = 0 + + +async def test_switch(hass, config_entry, zha_gateway): + """Test zha switch platform.""" + from zigpy.zcl.clusters.general import OnOff + from zigpy.zcl.foundation import Status + + # create zigpy device + zigpy_device = await async_init_zigpy_device( + hass, [OnOff.cluster_id], [], None, zha_gateway) + + # load up switch domain + await hass.config_entries.async_forward_entry_setup( + config_entry, DOMAIN) + await hass.async_block_till_done() + + cluster = zigpy_device.endpoints.get(1).on_off + entity_id = make_entity_id(DOMAIN, zigpy_device, cluster) + + # test that the state has changed from unavailable to off + assert hass.states.get(entity_id).state == STATE_OFF + + # turn on at switch + attr = make_attribute(0, 1) + cluster.handle_message(False, 1, 0x0a, [[attr]]) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_ON + + # turn off at switch + attr.value.value = 0 + cluster.handle_message(False, 0, 0x0a, [[attr]]) + await hass.async_block_till_done() + assert hass.states.get(entity_id).state == STATE_OFF + + with patch( + 'zigpy.zcl.Cluster.request', + return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])): + # turn on via UI + await hass.services.async_call(DOMAIN, 'turn_on', { + 'entity_id': entity_id + }, blocking=True) + assert len(cluster.request.mock_calls) == 1 + assert cluster.request.call_args == call( + False, ON, (), expect_reply=True, manufacturer=None) + + with patch( + 'zigpy.zcl.Cluster.request', + return_value=mock_coro([Status.SUCCESS, Status.SUCCESS])): + # turn off via UI + await hass.services.async_call(DOMAIN, 'turn_off', { + 'entity_id': entity_id + }, blocking=True) + assert len(cluster.request.mock_calls) == 1 + assert cluster.request.call_args == call( + False, OFF, (), expect_reply=True, manufacturer=None)