From 0c2f22d4780612545c483627da729e44d46ee9fd Mon Sep 17 00:00:00 2001 From: rforro Date: Wed, 25 May 2022 01:43:35 +0200 Subject: [PATCH] Add configurable zha switch entity (#71784) * add configurable zha switch entity * final zha configurable switch * fix codecov * replaced errorneous cluster with local quirk * test fix * minor changes --- homeassistant/components/zha/switch.py | 129 ++++++++++++++++- tests/components/zha/test_switch.py | 186 +++++++++++++++++++++++++ 2 files changed, 314 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/zha/switch.py b/homeassistant/components/zha/switch.py index 76c41093ed6..800c42eb932 100644 --- a/homeassistant/components/zha/switch.py +++ b/homeassistant/components/zha/switch.py @@ -2,8 +2,10 @@ from __future__ import annotations import functools -from typing import Any +import logging +from typing import TYPE_CHECKING, Any +import zigpy.exceptions from zigpy.zcl.clusters.general import OnOff from zigpy.zcl.foundation import Status @@ -12,6 +14,7 @@ from homeassistant.config_entries import ConfigEntry from homeassistant.const import STATE_ON, STATE_UNAVAILABLE, Platform from homeassistant.core import HomeAssistant, State, callback from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback from .core import discovery @@ -24,8 +27,17 @@ from .core.const import ( from .core.registries import ZHA_ENTITIES from .entity import ZhaEntity, ZhaGroupEntity +if TYPE_CHECKING: + from .core.channels.base import ZigbeeChannel + from .core.device import ZHADevice + STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, Platform.SWITCH) GROUP_MATCH = functools.partial(ZHA_ENTITIES.group_match, Platform.SWITCH) +CONFIG_DIAGNOSTIC_MATCH = functools.partial( + ZHA_ENTITIES.config_diagnostic_match, Platform.SWITCH +) + +_LOGGER = logging.getLogger(__name__) async def async_setup_entry( @@ -138,3 +150,118 @@ class SwitchGroup(ZhaGroupEntity, SwitchEntity): self._state = len(on_states) > 0 self._available = any(state.state != STATE_UNAVAILABLE for state in states) + + +class ZHASwitchConfigurationEntity(ZhaEntity, SwitchEntity): + """Representation of a ZHA switch configuration entity.""" + + _zcl_attribute: str + _zcl_inverter_attribute: str = "" + + @classmethod + def create_entity( + cls, + unique_id: str, + zha_device: ZHADevice, + channels: list[ZigbeeChannel], + **kwargs, + ) -> ZhaEntity | None: + """Entity Factory. + + Return entity if it is a supported configuration, otherwise return None + """ + channel = channels[0] + if ( + cls._zcl_attribute in channel.cluster.unsupported_attributes + or channel.cluster.get(cls._zcl_attribute) is None + ): + _LOGGER.debug( + "%s is not supported - skipping %s entity creation", + cls._zcl_attribute, + cls.__name__, + ) + return None + + return cls(unique_id, zha_device, channels, **kwargs) + + def __init__( + self, + unique_id: str, + zha_device: ZHADevice, + channels: list[ZigbeeChannel], + **kwargs, + ) -> None: + """Init this number configuration entity.""" + self._channel: ZigbeeChannel = channels[0] + super().__init__(unique_id, zha_device, channels, **kwargs) + + async def async_added_to_hass(self) -> None: + """Run when about to be added to hass.""" + await super().async_added_to_hass() + self.async_accept_signal( + self._channel, SIGNAL_ATTR_UPDATED, self.async_set_state + ) + + @callback + def async_set_state(self, attr_id: int, attr_name: str, value: Any): + """Handle state update from channel.""" + self.async_write_ha_state() + + @property + def is_on(self) -> bool: + """Return if the switch is on based on the statemachine.""" + val = bool(self._channel.cluster.get(self._zcl_attribute)) + invert = bool(self._channel.cluster.get(self._zcl_inverter_attribute)) + return (not val) if invert else val + + async def async_turn_on_off(self, state) -> None: + """Turn the entity on or off.""" + try: + invert = bool(self._channel.cluster.get(self._zcl_inverter_attribute)) + result = await self._channel.cluster.write_attributes( + {self._zcl_attribute: not state if invert else state} + ) + except zigpy.exceptions.ZigbeeException as ex: + self.error("Could not set value: %s", ex) + return + if not isinstance(result, Exception) and all( + record.status == Status.SUCCESS for record in result[0] + ): + self.async_write_ha_state() + + async def async_turn_on(self, **kwargs) -> None: + """Turn the entity on.""" + await self.async_turn_on_off(True) + + async def async_turn_off(self, **kwargs) -> None: + """Turn the entity off.""" + await self.async_turn_on_off(False) + + async def async_update(self) -> None: + """Attempt to retrieve the state of the entity.""" + await super().async_update() + _LOGGER.error("Polling current state") + if self._channel: + value = await self._channel.get_attribute_value( + self._zcl_attribute, from_cache=False + ) + invert = await self._channel.get_attribute_value( + self._zcl_inverter_attribute, from_cache=False + ) + _LOGGER.debug("read value=%s, inverter=%s", value, bool(invert)) + + +@CONFIG_DIAGNOSTIC_MATCH( + channel_names="tuya_manufacturer", + manufacturers={ + "_TZE200_b6wax7g0", + }, +) +class OnOffWindowDetectionFunctionConfigurationEntity( + ZHASwitchConfigurationEntity, id_suffix="on_off_window_opened_detection" +): + """Representation of a ZHA on off transition time configuration entity.""" + + _attr_entity_category = EntityCategory.CONFIG + _zcl_attribute = "window_detection_function" + _zcl_inverter_attribute = "window_detection_function_inverter" diff --git a/tests/components/zha/test_switch.py b/tests/components/zha/test_switch.py index a624e5f2c73..99e8a681348 100644 --- a/tests/components/zha/test_switch.py +++ b/tests/components/zha/test_switch.py @@ -2,13 +2,25 @@ from unittest.mock import call, patch import pytest +from zhaquirks.const import ( + DEVICE_TYPE, + ENDPOINTS, + INPUT_CLUSTERS, + OUTPUT_CLUSTERS, + PROFILE_ID, +) +from zigpy.exceptions import ZigbeeException import zigpy.profiles.zha as zha +from zigpy.quirks import CustomCluster, CustomDevice +import zigpy.types as t import zigpy.zcl.clusters.general as general +from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster import zigpy.zcl.foundation as zcl_f from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN from homeassistant.components.zha.core.group import GroupMember from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, Platform +from homeassistant.setup import async_setup_component from .common import ( async_enable_traffic, @@ -174,6 +186,61 @@ async def test_switch(hass, zha_device_joined_restored, zigpy_device): await async_test_rejoin(hass, zigpy_device, [cluster], (1,)) +class WindowDetectionFunctionQuirk(CustomDevice): + """Quirk with window detection function attribute.""" + + class TuyaManufCluster(CustomCluster, ManufacturerSpecificCluster): + """Tuya manufacturer specific cluster.""" + + cluster_id = 0xEF00 + ep_attribute = "tuya_manufacturer" + + attributes = { + 0xEF01: ("window_detection_function", t.Bool), + 0xEF02: ("window_detection_function_inverter", t.Bool), + } + + def __init__(self, *args, **kwargs): + """Initialize with task.""" + super().__init__(*args, **kwargs) + self._attr_cache.update( + {0xEF01: False} + ) # entity won't be created without this + + replacement = { + ENDPOINTS: { + 1: { + PROFILE_ID: zha.PROFILE_ID, + DEVICE_TYPE: zha.DeviceType.ON_OFF_SWITCH, + INPUT_CLUSTERS: [general.Basic.cluster_id, TuyaManufCluster], + OUTPUT_CLUSTERS: [], + }, + } + } + + +@pytest.fixture +async def zigpy_device_tuya(hass, zigpy_device_mock, zha_device_joined): + """Device tracker zigpy tuya device.""" + + zigpy_device = zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [general.Basic.cluster_id], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha.DeviceType.ON_OFF_SWITCH, + } + }, + manufacturer="_TZE200_b6wax7g0", + quirk=WindowDetectionFunctionQuirk, + ) + + zha_device = await zha_device_joined(zigpy_device) + zha_device.available = True + await hass.async_block_till_done() + return zigpy_device + + @patch( "homeassistant.components.zha.entity.UPDATE_GROUP_FROM_CHILD_DELAY", new=0, @@ -292,3 +359,122 @@ async def test_zha_group_switch_entity( # test that group light is now back on assert hass.states.get(entity_id).state == STATE_ON + + +async def test_switch_configurable(hass, zha_device_joined_restored, zigpy_device_tuya): + """Test zha configurable switch platform.""" + + zha_device = await zha_device_joined_restored(zigpy_device_tuya) + cluster = zigpy_device_tuya.endpoints.get(1).tuya_manufacturer + entity_id = await find_entity_id(Platform.SWITCH, zha_device, hass) + assert entity_id is not None + + assert hass.states.get(entity_id).state == STATE_OFF + await async_enable_traffic(hass, [zha_device], enabled=False) + # test that the switch was created and that its state is unavailable + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + + # allow traffic to flow through the gateway and device + await async_enable_traffic(hass, [zha_device]) + + # test that the state has changed from unavailable to off + assert hass.states.get(entity_id).state == STATE_OFF + + # turn on at switch + await send_attributes_report(hass, cluster, {"window_detection_function": True}) + assert hass.states.get(entity_id).state == STATE_ON + + # turn off at switch + await send_attributes_report(hass, cluster, {"window_detection_function": False}) + assert hass.states.get(entity_id).state == STATE_OFF + + # turn on from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]), + ): + # turn on via UI + await hass.services.async_call( + SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True + ) + assert len(cluster.write_attributes.mock_calls) == 1 + assert cluster.write_attributes.call_args == call( + {"window_detection_function": True} + ) + + # turn off from HA + with patch( + "zigpy.zcl.Cluster.write_attributes", + return_value=mock_coro([zcl_f.Status.SUCCESS, zcl_f.Status.SUCCESS]), + ): + # turn off via UI + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert len(cluster.write_attributes.mock_calls) == 2 + assert cluster.write_attributes.call_args == call( + {"window_detection_function": False} + ) + + cluster.read_attributes.reset_mock() + await async_setup_component(hass, "homeassistant", {}) + await hass.async_block_till_done() + + await hass.services.async_call( + "homeassistant", "update_entity", {"entity_id": entity_id}, blocking=True + ) + # the mocking doesn't update the attr cache so this flips back to initial value + assert cluster.read_attributes.call_count == 2 + assert [ + call( + [ + "window_detection_function", + ], + allow_cache=False, + only_cache=False, + manufacturer=None, + ), + call( + [ + "window_detection_function_inverter", + ], + allow_cache=False, + only_cache=False, + manufacturer=None, + ), + ] == cluster.read_attributes.call_args_list + + cluster.write_attributes.reset_mock() + cluster.write_attributes.side_effect = ZigbeeException + + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + + assert len(cluster.write_attributes.mock_calls) == 1 + assert cluster.write_attributes.call_args == call( + {"window_detection_function": False} + ) + + # test inverter + cluster.write_attributes.reset_mock() + cluster._attr_cache.update({0xEF02: True}) + + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert len(cluster.write_attributes.mock_calls) == 1 + assert cluster.write_attributes.call_args == call( + {"window_detection_function": True} + ) + + await hass.services.async_call( + SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True + ) + assert len(cluster.write_attributes.mock_calls) == 2 + assert cluster.write_attributes.call_args == call( + {"window_detection_function": False} + ) + + # test joining a new switch to the network and HA + await async_test_rejoin(hass, zigpy_device_tuya, [cluster], (0,))