diff --git a/homeassistant/components/zha/core/cluster_handlers/__init__.py b/homeassistant/components/zha/core/cluster_handlers/__init__.py index 65137d683de..6c65a993e95 100644 --- a/homeassistant/components/zha/core/cluster_handlers/__init__.py +++ b/homeassistant/components/zha/core/cluster_handlers/__init__.py @@ -424,10 +424,18 @@ class ClusterHandler(LogMixin): @callback def attribute_updated(self, attrid: int, value: Any, _: Any) -> None: """Handle attribute updates on this cluster.""" + attr_name = self._get_attribute_name(attrid) + self.debug( + "cluster_handler[%s] attribute_updated - cluster[%s] attr[%s] value[%s]", + self.name, + self.cluster.name, + attr_name, + value, + ) self.async_send_signal( f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, - self._get_attribute_name(attrid), + attr_name, value, ) diff --git a/homeassistant/components/zha/core/cluster_handlers/closures.py b/homeassistant/components/zha/core/cluster_handlers/closures.py index a7056fe9a9f..13ca6f92aaf 100644 --- a/homeassistant/components/zha/core/cluster_handlers/closures.py +++ b/homeassistant/components/zha/core/cluster_handlers/closures.py @@ -1,10 +1,10 @@ """Closures cluster handlers module for Zigbee Home Automation.""" from __future__ import annotations -from typing import TYPE_CHECKING, Any +from typing import Any -import zigpy.zcl -from zigpy.zcl.clusters.closures import DoorLock, Shade, WindowCovering +import zigpy.types as t +from zigpy.zcl.clusters.closures import ConfigStatus, DoorLock, Shade, WindowCovering from homeassistant.core import callback @@ -12,9 +12,6 @@ from .. import registries from ..const import REPORT_CONFIG_IMMEDIATE, SIGNAL_ATTR_UPDATED from . import AttrReportConfig, ClientClusterHandler, ClusterHandler -if TYPE_CHECKING: - from ..endpoint import Endpoint - @registries.ZIGBEE_CLUSTER_HANDLER_REGISTRY.register(DoorLock.cluster_id) class DoorLockClusterHandler(ClusterHandler): @@ -53,7 +50,7 @@ class DoorLockClusterHandler(ClusterHandler): command_name = self._cluster.client_commands[command_id].name - if command_name == "operation_event_notification": + if command_name == DoorLock.ClientCommandDefs.operation_event_notification.name: self.zha_send_event( command_name, { @@ -138,62 +135,140 @@ class WindowCoveringClientClusterHandler(ClientClusterHandler): class WindowCoveringClusterHandler(ClusterHandler): """Window cluster handler.""" - _value_attribute_lift = ( - WindowCovering.AttributeDefs.current_position_lift_percentage.id - ) - _value_attribute_tilt = ( - WindowCovering.AttributeDefs.current_position_tilt_percentage.id - ) REPORT_CONFIG = ( AttrReportConfig( - attr="current_position_lift_percentage", config=REPORT_CONFIG_IMMEDIATE + attr=WindowCovering.AttributeDefs.current_position_lift_percentage.name, + config=REPORT_CONFIG_IMMEDIATE, ), AttrReportConfig( - attr="current_position_tilt_percentage", config=REPORT_CONFIG_IMMEDIATE + attr=WindowCovering.AttributeDefs.current_position_tilt_percentage.name, + config=REPORT_CONFIG_IMMEDIATE, ), ) - def __init__(self, cluster: zigpy.zcl.Cluster, endpoint: Endpoint) -> None: - """Initialize WindowCovering cluster handler.""" - super().__init__(cluster, endpoint) - - if self.cluster.endpoint.model == "lumi.curtain.agl001": - self.ZCL_INIT_ATTRS = self.ZCL_INIT_ATTRS.copy() - self.ZCL_INIT_ATTRS["window_covering_mode"] = True + ZCL_INIT_ATTRS = { + WindowCovering.AttributeDefs.window_covering_type.name: True, + WindowCovering.AttributeDefs.window_covering_mode.name: True, + WindowCovering.AttributeDefs.config_status.name: True, + WindowCovering.AttributeDefs.installed_closed_limit_lift.name: True, + WindowCovering.AttributeDefs.installed_closed_limit_tilt.name: True, + WindowCovering.AttributeDefs.installed_open_limit_lift.name: True, + WindowCovering.AttributeDefs.installed_open_limit_tilt.name: True, + } async def async_update(self): """Retrieve latest state.""" - result = await self.get_attribute_value( - "current_position_lift_percentage", from_cache=False + results = await self.get_attributes( + [ + WindowCovering.AttributeDefs.current_position_lift_percentage.name, + WindowCovering.AttributeDefs.current_position_tilt_percentage.name, + ], + from_cache=False, + only_cache=False, ) - self.debug("read current position: %s", result) - if result is not None: - self.async_send_signal( - f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", - self._value_attribute_lift, - "current_position_lift_percentage", - result, + self.debug( + "read current_position_lift_percentage and current_position_tilt_percentage - results: %s", + results, + ) + if ( + results + and results.get( + WindowCovering.AttributeDefs.current_position_lift_percentage.name ) - result = await self.get_attribute_value( - "current_position_tilt_percentage", from_cache=False - ) - self.debug("read current tilt position: %s", result) - if result is not None: + is not None + ): + # the 100 - value is because we need to invert the value before giving it to the entity self.async_send_signal( f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", - self._value_attribute_tilt, - "current_position_tilt_percentage", - result, + WindowCovering.AttributeDefs.current_position_lift_percentage.id, + WindowCovering.AttributeDefs.current_position_lift_percentage.name, + 100 + - results.get( + WindowCovering.AttributeDefs.current_position_lift_percentage.name + ), + ) + if ( + results + and results.get( + WindowCovering.AttributeDefs.current_position_tilt_percentage.name + ) + is not None + ): + # the 100 - value is because we need to invert the value before giving it to the entity + self.async_send_signal( + f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", + WindowCovering.AttributeDefs.current_position_tilt_percentage.id, + WindowCovering.AttributeDefs.current_position_tilt_percentage.name, + 100 + - results.get( + WindowCovering.AttributeDefs.current_position_tilt_percentage.name + ), ) - @callback - def attribute_updated(self, attrid: int, value: Any, _: Any) -> None: - """Handle attribute update from window_covering cluster.""" - attr_name = self._get_attribute_name(attrid) - self.debug( - "Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value + @property + def inverted(self): + """Return true if the window covering is inverted.""" + config_status = self.cluster.get( + WindowCovering.AttributeDefs.config_status.name + ) + return ( + config_status is not None + and ConfigStatus.Open_up_commands_reversed in ConfigStatus(config_status) + ) + + @property + def current_position_lift_percentage(self) -> t.uint16_t | None: + """Return the current lift percentage of the window covering.""" + lift_percentage = self.cluster.get( + WindowCovering.AttributeDefs.current_position_lift_percentage.name + ) + if lift_percentage is not None: + # the 100 - value is because we need to invert the value before giving it to the entity + lift_percentage = 100 - lift_percentage + return lift_percentage + + @property + def current_position_tilt_percentage(self) -> t.uint16_t | None: + """Return the current tilt percentage of the window covering.""" + tilt_percentage = self.cluster.get( + WindowCovering.AttributeDefs.current_position_tilt_percentage.name + ) + if tilt_percentage is not None: + # the 100 - value is because we need to invert the value before giving it to the entity + tilt_percentage = 100 - tilt_percentage + return tilt_percentage + + @property + def installed_open_limit_lift(self) -> t.uint16_t | None: + """Return the installed open lift limit of the window covering.""" + return self.cluster.get( + WindowCovering.AttributeDefs.installed_open_limit_lift.name + ) + + @property + def installed_closed_limit_lift(self) -> t.uint16_t | None: + """Return the installed closed lift limit of the window covering.""" + return self.cluster.get( + WindowCovering.AttributeDefs.installed_closed_limit_lift.name + ) + + @property + def installed_open_limit_tilt(self) -> t.uint16_t | None: + """Return the installed open tilt limit of the window covering.""" + return self.cluster.get( + WindowCovering.AttributeDefs.installed_open_limit_tilt.name + ) + + @property + def installed_closed_limit_tilt(self) -> t.uint16_t | None: + """Return the installed closed tilt limit of the window covering.""" + return self.cluster.get( + WindowCovering.AttributeDefs.installed_closed_limit_tilt.name + ) + + @property + def window_covering_type(self) -> WindowCovering.WindowCoveringType: + """Return the window covering type.""" + return WindowCovering.WindowCoveringType( + self.cluster.get(WindowCovering.AttributeDefs.window_covering_type.name) ) - if attrid in (self._value_attribute_lift, self._value_attribute_tilt): - self.async_send_signal( - f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value - ) diff --git a/homeassistant/components/zha/cover.py b/homeassistant/components/zha/cover.py index f36cbc13533..d94a2f907d1 100644 --- a/homeassistant/components/zha/cover.py +++ b/homeassistant/components/zha/cover.py @@ -4,8 +4,9 @@ from __future__ import annotations import asyncio import functools import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast +from zigpy.zcl.clusters.closures import WindowCovering as WindowCoveringCluster from zigpy.zcl.foundation import Status from homeassistant.components.cover import ( @@ -14,6 +15,7 @@ from homeassistant.components.cover import ( ATTR_TILT_POSITION, CoverDeviceClass, CoverEntity, + CoverEntityFeature, ) from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( @@ -29,6 +31,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect from homeassistant.helpers.entity_platform import AddEntitiesCallback from .core import discovery +from .core.cluster_handlers.closures import WindowCoveringClusterHandler from .core.const import ( CLUSTER_HANDLER_COVER, CLUSTER_HANDLER_LEVEL, @@ -70,40 +73,145 @@ async def async_setup_entry( config_entry.async_on_unload(unsub) +WCAttrs = WindowCoveringCluster.AttributeDefs +WCT = WindowCoveringCluster.WindowCoveringType +WCCS = WindowCoveringCluster.ConfigStatus + +ZCL_TO_COVER_DEVICE_CLASS = { + WCT.Awning: CoverDeviceClass.AWNING, + WCT.Drapery: CoverDeviceClass.CURTAIN, + WCT.Projector_screen: CoverDeviceClass.SHADE, + WCT.Rollershade: CoverDeviceClass.SHADE, + WCT.Rollershade_two_motors: CoverDeviceClass.SHADE, + WCT.Rollershade_exterior: CoverDeviceClass.SHADE, + WCT.Rollershade_exterior_two_motors: CoverDeviceClass.SHADE, + WCT.Shutter: CoverDeviceClass.SHUTTER, + WCT.Tilt_blind_tilt_only: CoverDeviceClass.BLIND, + WCT.Tilt_blind_tilt_and_lift: CoverDeviceClass.BLIND, +} + + @MULTI_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER) class ZhaCover(ZhaEntity, CoverEntity): """Representation of a ZHA cover.""" _attr_translation_key: str = "cover" - def __init__(self, unique_id, zha_device, cluster_handlers, **kwargs): - """Init this sensor.""" + def __init__( + self, + unique_id: str, + zha_device: ZHADevice, + cluster_handlers: list[ClusterHandler], + **kwargs: Any, + ) -> None: + """Init this cover.""" super().__init__(unique_id, zha_device, cluster_handlers, **kwargs) - self._cover_cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER) - self._current_position = None - self._tilt_position = None + cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER) + assert cluster_handler + self._cover_cluster_handler: WindowCoveringClusterHandler = cast( + WindowCoveringClusterHandler, cluster_handler + ) + if self._cover_cluster_handler.window_covering_type: + self._attr_device_class: CoverDeviceClass | None = ( + ZCL_TO_COVER_DEVICE_CLASS.get( + self._cover_cluster_handler.window_covering_type + ) + ) + self._attr_supported_features: CoverEntityFeature = ( + self._determine_supported_features() + ) + self._target_lift_position: int | None = None + self._target_tilt_position: int | None = None + self._determine_initial_state() + + def _determine_supported_features(self) -> CoverEntityFeature: + """Determine the supported cover features.""" + supported_features: CoverEntityFeature = ( + CoverEntityFeature.OPEN + | CoverEntityFeature.CLOSE + | CoverEntityFeature.STOP + | CoverEntityFeature.SET_POSITION + ) + if ( + self._cover_cluster_handler.window_covering_type + and self._cover_cluster_handler.window_covering_type + in ( + WCT.Shutter, + WCT.Tilt_blind_tilt_only, + WCT.Tilt_blind_tilt_and_lift, + ) + ): + supported_features |= CoverEntityFeature.SET_TILT_POSITION + supported_features |= CoverEntityFeature.OPEN_TILT + supported_features |= CoverEntityFeature.CLOSE_TILT + supported_features |= CoverEntityFeature.STOP_TILT + return supported_features + + def _determine_initial_state(self) -> None: + """Determine the initial state of the cover.""" + if ( + self._cover_cluster_handler.window_covering_type + and self._cover_cluster_handler.window_covering_type + in ( + WCT.Shutter, + WCT.Tilt_blind_tilt_only, + WCT.Tilt_blind_tilt_and_lift, + ) + ): + self._determine_state( + self.current_cover_tilt_position, is_lift_update=False + ) + if ( + self._cover_cluster_handler.window_covering_type + == WCT.Tilt_blind_tilt_and_lift + ): + state = self._state + self._determine_state(self.current_cover_position) + if state == STATE_OPEN and self._state == STATE_CLOSED: + # let the tilt state override the lift state + self._state = STATE_OPEN + else: + self._determine_state(self.current_cover_position) + + def _determine_state(self, position_or_tilt, is_lift_update=True) -> None: + """Determine the state of the cover. + + In HA None is unknown, 0 is closed, 100 is fully open. + In ZCL 0 is fully open, 100 is fully closed. + Keep in mind the values have already been flipped to match HA + in the WindowCovering cluster handler + """ + if is_lift_update: + target = self._target_lift_position + current = self.current_cover_position + else: + target = self._target_tilt_position + current = self.current_cover_tilt_position + + if position_or_tilt == 100: + self._state = STATE_CLOSED + return + if target is not None and target != current: + # we are mid transition and shouldn't update the state + return + self._state = STATE_OPEN async def async_added_to_hass(self) -> None: - """Run when about to be added to hass.""" + """Run when the cover entity is about to be added to hass.""" await super().async_added_to_hass() self.async_accept_signal( - self._cover_cluster_handler, SIGNAL_ATTR_UPDATED, self.async_set_position + self._cover_cluster_handler, SIGNAL_ATTR_UPDATED, self.zcl_attribute_updated ) - @callback - def async_restore_last_state(self, last_state): - """Restore previous state.""" - self._state = last_state.state - if "current_position" in last_state.attributes: - self._current_position = last_state.attributes["current_position"] - if "current_tilt_position" in last_state.attributes: - self._tilt_position = last_state.attributes[ - "current_tilt_position" - ] # first allocation activate tilt - @property def is_closed(self) -> bool | None: - """Return if the cover is closed.""" + """Return True if the cover is closed. + + In HA None is unknown, 0 is closed, 100 is fully open. + In ZCL 0 is fully open, 100 is fully closed. + Keep in mind the values have already been flipped to match HA + in the WindowCovering cluster handler + """ if self.current_cover_position is None: return None return self.current_cover_position == 0 @@ -122,39 +230,45 @@ class ZhaCover(ZhaEntity, CoverEntity): def current_cover_position(self) -> int | None: """Return the current position of ZHA cover. - None is unknown, 0 is closed, 100 is fully open. + In HA None is unknown, 0 is closed, 100 is fully open. + In ZCL 0 is fully open, 100 is fully closed. + Keep in mind the values have already been flipped to match HA + in the WindowCovering cluster handler """ - return self._current_position + return self._cover_cluster_handler.current_position_lift_percentage @property def current_cover_tilt_position(self) -> int | None: """Return the current tilt position of the cover.""" - return self._tilt_position + return self._cover_cluster_handler.current_position_tilt_percentage @callback - def async_set_position(self, attr_id, attr_name, value): + def zcl_attribute_updated(self, attr_id, attr_name, value): """Handle position update from cluster handler.""" - _LOGGER.debug("setting position: %s %s %s", attr_id, attr_name, value) - if attr_name == "current_position_lift_percentage": - self._current_position = 100 - value - elif attr_name == "current_position_tilt_percentage": - self._tilt_position = 100 - value - - if self._current_position == 0: - self._state = STATE_CLOSED - elif self._current_position == 100: - self._state = STATE_OPEN + if attr_id in ( + WCAttrs.current_position_lift_percentage.id, + WCAttrs.current_position_tilt_percentage.id, + ): + value = ( + self.current_cover_position + if attr_id == WCAttrs.current_position_lift_percentage.id + else self.current_cover_tilt_position + ) + self._determine_state( + value, + is_lift_update=attr_id == WCAttrs.current_position_lift_percentage.id, + ) self.async_write_ha_state() @callback def async_update_state(self, state): - """Handle state update from cluster handler.""" - _LOGGER.debug("state=%s", state) + """Handle state update from HA operations below.""" + _LOGGER.debug("async_update_state=%s", state) self._state = state self.async_write_ha_state() async def async_open_cover(self, **kwargs: Any) -> None: - """Open the window cover.""" + """Open the cover.""" res = await self._cover_cluster_handler.up_open() if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to open cover: {res[1]}") @@ -162,13 +276,14 @@ class ZhaCover(ZhaEntity, CoverEntity): async def async_open_cover_tilt(self, **kwargs: Any) -> None: """Open the cover tilt.""" + # 0 is open in ZCL res = await self._cover_cluster_handler.go_to_tilt_percentage(0) if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to open cover tilt: {res[1]}") self.async_update_state(STATE_OPENING) async def async_close_cover(self, **kwargs: Any) -> None: - """Close the window cover.""" + """Close the cover.""" res = await self._cover_cluster_handler.down_close() if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to close cover: {res[1]}") @@ -176,42 +291,63 @@ class ZhaCover(ZhaEntity, CoverEntity): async def async_close_cover_tilt(self, **kwargs: Any) -> None: """Close the cover tilt.""" + # 100 is closed in ZCL res = await self._cover_cluster_handler.go_to_tilt_percentage(100) if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to close cover tilt: {res[1]}") self.async_update_state(STATE_CLOSING) async def async_set_cover_position(self, **kwargs: Any) -> None: - """Move the roller shutter to a specific position.""" - new_pos = kwargs[ATTR_POSITION] - res = await self._cover_cluster_handler.go_to_lift_percentage(100 - new_pos) + """Move the cover to a specific position.""" + self._target_lift_position = kwargs[ATTR_POSITION] + assert self._target_lift_position is not None + assert self.current_cover_position is not None + # the 100 - value is because we need to invert the value before giving it to ZCL + res = await self._cover_cluster_handler.go_to_lift_percentage( + 100 - self._target_lift_position + ) if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to set cover position: {res[1]}") self.async_update_state( - STATE_CLOSING if new_pos < self._current_position else STATE_OPENING + STATE_CLOSING + if self._target_lift_position < self.current_cover_position + else STATE_OPENING ) async def async_set_cover_tilt_position(self, **kwargs: Any) -> None: - """Move the cover til to a specific position.""" - new_pos = kwargs[ATTR_TILT_POSITION] - res = await self._cover_cluster_handler.go_to_tilt_percentage(100 - new_pos) + """Move the cover tilt to a specific position.""" + self._target_tilt_position = kwargs[ATTR_TILT_POSITION] + assert self._target_tilt_position is not None + assert self.current_cover_tilt_position is not None + # the 100 - value is because we need to invert the value before giving it to ZCL + res = await self._cover_cluster_handler.go_to_tilt_percentage( + 100 - self._target_tilt_position + ) if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to set cover tilt position: {res[1]}") self.async_update_state( - STATE_CLOSING if new_pos < self._tilt_position else STATE_OPENING + STATE_CLOSING + if self._target_tilt_position < self.current_cover_tilt_position + else STATE_OPENING ) async def async_stop_cover(self, **kwargs: Any) -> None: - """Stop the window cover.""" + """Stop the cover.""" res = await self._cover_cluster_handler.stop() if res[1] is not Status.SUCCESS: raise HomeAssistantError(f"Failed to stop cover: {res[1]}") - self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED + self._target_lift_position = self.current_cover_position + self._determine_state(self.current_cover_position) self.async_write_ha_state() async def async_stop_cover_tilt(self, **kwargs: Any) -> None: """Stop the cover tilt.""" - await self.async_stop_cover() + res = await self._cover_cluster_handler.stop() + if res[1] is not Status.SUCCESS: + raise HomeAssistantError(f"Failed to stop cover: {res[1]}") + self._target_tilt_position = self.current_cover_tilt_position + self._determine_state(self.current_cover_tilt_position, is_lift_update=False) + self.async_write_ha_state() @MULTI_MATCH( diff --git a/homeassistant/components/zha/strings.json b/homeassistant/components/zha/strings.json index e2875550398..08c485f01b3 100644 --- a/homeassistant/components/zha/strings.json +++ b/homeassistant/components/zha/strings.json @@ -905,6 +905,9 @@ "invert_switch": { "name": "Invert switch" }, + "inverted": { + "name": "Inverted" + }, "smart_bulb_mode": { "name": "Smart bulb mode" }, diff --git a/homeassistant/components/zha/switch.py b/homeassistant/components/zha/switch.py index d4e835751f5..57b84bd1aa1 100644 --- a/homeassistant/components/zha/switch.py +++ b/homeassistant/components/zha/switch.py @@ -6,6 +6,7 @@ import logging from typing import TYPE_CHECKING, Any, Self from zhaquirks.quirk_ids import TUYA_PLUG_ONOFF +from zigpy.zcl.clusters.closures import ConfigStatus, WindowCovering, WindowCoveringMode from zigpy.zcl.clusters.general import OnOff from zigpy.zcl.foundation import Status @@ -19,6 +20,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback from .core import discovery from .core.const import ( CLUSTER_HANDLER_BASIC, + CLUSTER_HANDLER_COVER, CLUSTER_HANDLER_INOVELLI, CLUSTER_HANDLER_ON_OFF, SIGNAL_ADD_ENTITIES, @@ -588,3 +590,62 @@ class AqaraBuzzerManualAlarm(ZHASwitchConfigurationEntity): _attribute_name = "buzzer_manual_alarm" _attr_translation_key = "buzzer_manual_alarm" _attr_icon: str = "mdi:bullhorn" + + +@CONFIG_DIAGNOSTIC_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER) +class WindowCoveringInversionSwitch(ZHASwitchConfigurationEntity): + """Representation of a switch that controls inversion for window covering devices. + + This is necessary because this cluster uses 2 attributes to control inversion. + """ + + _unique_id_suffix = "inverted" + _attribute_name = WindowCovering.AttributeDefs.config_status.name + _attr_translation_key = "inverted" + _attr_icon: str = "mdi:arrow-up-down" + + @property + def is_on(self) -> bool: + """Return if the switch is on based on the statemachine.""" + config_status = ConfigStatus( + self._cluster_handler.cluster.get(self._attribute_name) + ) + return ConfigStatus.Open_up_commands_reversed in config_status + + async def async_turn_on(self, **kwargs: Any) -> None: + """Turn the entity on.""" + await self._async_on_off(True) + + async def async_turn_off(self, **kwargs: Any) -> None: + """Turn the entity off.""" + await self._async_on_off(False) + + async def async_update(self) -> None: + """Attempt to retrieve the state of the entity.""" + self.debug("Polling current state") + await self._cluster_handler.get_attributes( + [ + self._attribute_name, + WindowCovering.AttributeDefs.window_covering_mode.name, + ], + from_cache=False, + only_cache=False, + ) + self.async_write_ha_state() + + async def _async_on_off(self, invert: bool) -> None: + """Turn the entity on or off.""" + name: str = WindowCovering.AttributeDefs.window_covering_mode.name + current_mode: WindowCoveringMode = WindowCoveringMode( + self._cluster_handler.cluster.get(name) + ) + send_command: bool = False + if invert and WindowCoveringMode.Motor_direction_reversed not in current_mode: + current_mode |= WindowCoveringMode.Motor_direction_reversed + send_command = True + elif not invert and WindowCoveringMode.Motor_direction_reversed in current_mode: + current_mode &= ~WindowCoveringMode.Motor_direction_reversed + send_command = True + if send_command: + await self._cluster_handler.write_attributes_safe({name: current_mode}) + await self.async_update() diff --git a/tests/components/zha/test_cover.py b/tests/components/zha/test_cover.py index 5a1f2a862ac..55a4cbebfe7 100644 --- a/tests/components/zha/test_cover.py +++ b/tests/components/zha/test_cover.py @@ -28,7 +28,9 @@ from homeassistant.components.zha.core.const import ZHA_EVENT from homeassistant.const import ( ATTR_COMMAND, STATE_CLOSED, + STATE_CLOSING, STATE_OPEN, + STATE_OPENING, STATE_UNAVAILABLE, Platform, ) @@ -42,6 +44,7 @@ from .common import ( find_entity_id, make_zcl_header, send_attributes_report, + update_attribute_cache, ) from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE @@ -131,21 +134,40 @@ def zigpy_keen_vent(zigpy_device_mock): ) -async def test_cover( +WCAttrs = closures.WindowCovering.AttributeDefs +WCCmds = closures.WindowCovering.ServerCommandDefs +WCT = closures.WindowCovering.WindowCoveringType +WCCS = closures.WindowCovering.ConfigStatus + + +async def test_cover_non_tilt_initial_state( hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device ) -> None: """Test ZHA cover platform.""" # load up cover domain - cluster = zigpy_cover_device.endpoints.get(1).window_covering + cluster = zigpy_cover_device.endpoints[1].window_covering cluster.PLUGGED_ATTR_READS = { - "current_position_lift_percentage": 65, - "current_position_tilt_percentage": 42, + WCAttrs.current_position_lift_percentage.name: 0, + WCAttrs.window_covering_type.name: WCT.Drapery, + WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed), } + update_attribute_cache(cluster) zha_device = await zha_device_joined_restored(zigpy_cover_device) - assert cluster.read_attributes.call_count == 1 - assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0] - assert "current_position_tilt_percentage" in cluster.read_attributes.call_args[0][0] + assert ( + not zha_device.endpoints[1] + .all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"] + .inverted + ) + assert cluster.read_attributes.call_count == 3 + assert ( + WCAttrs.current_position_lift_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + assert ( + WCAttrs.current_position_tilt_percentage.name + in cluster.read_attributes.call_args[0][0] + ) entity_id = find_entity_id(Platform.COVER, zha_device, hass) assert entity_id is not None @@ -161,27 +183,86 @@ async def test_cover( # test update prev_call_count = cluster.read_attributes.call_count await async_update_entity(hass, entity_id) - assert cluster.read_attributes.call_count == prev_call_count + 2 + assert cluster.read_attributes.call_count == prev_call_count + 1 state = hass.states.get(entity_id) assert state assert state.state == STATE_OPEN - assert state.attributes[ATTR_CURRENT_POSITION] == 35 + assert state.attributes[ATTR_CURRENT_POSITION] == 100 + + +async def test_cover( + hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device +) -> None: + """Test ZHA cover platform.""" + + # load up cover domain + cluster = zigpy_cover_device.endpoints[1].window_covering + cluster.PLUGGED_ATTR_READS = { + WCAttrs.current_position_lift_percentage.name: 0, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift, + WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed), + } + update_attribute_cache(cluster) + zha_device = await zha_device_joined_restored(zigpy_cover_device) + assert ( + not zha_device.endpoints[1] + .all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"] + .inverted + ) + assert cluster.read_attributes.call_count == 3 + assert ( + WCAttrs.current_position_lift_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + assert ( + WCAttrs.current_position_tilt_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + + entity_id = find_entity_id(Platform.COVER, zha_device, hass) + assert entity_id is not None + + await async_enable_traffic(hass, [zha_device], enabled=False) + # test that the cover was created and that it is unavailable + assert hass.states.get(entity_id).state == STATE_UNAVAILABLE + + # allow traffic to flow through the gateway and device + await async_enable_traffic(hass, [zha_device]) + await hass.async_block_till_done() + + # test update + prev_call_count = cluster.read_attributes.call_count + await async_update_entity(hass, entity_id) + assert cluster.read_attributes.call_count == prev_call_count + 1 + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OPEN + assert state.attributes[ATTR_CURRENT_POSITION] == 100 assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 58 # test that the state has changed from unavailable to off - await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1}) + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 100} + ) assert hass.states.get(entity_id).state == STATE_CLOSED # test to see if it opens - await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100}) + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 0} + ) assert hass.states.get(entity_id).state == STATE_OPEN # test that the state remains after tilting to 100% - await send_attributes_report(hass, cluster, {0: 0, 9: 100, 1: 1}) + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + ) assert hass.states.get(entity_id).state == STATE_OPEN # test to see the state remains after tilting to 0% - await send_attributes_report(hass, cluster, {0: 1, 9: 0, 1: 100}) + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) assert hass.states.get(entity_id).state == STATE_OPEN # close from UI @@ -192,9 +273,17 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x01 - assert cluster.request.call_args[0][2].command.name == "down_close" + assert cluster.request.call_args[0][2].command.name == WCCmds.down_close.name assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 100} + ) + + assert hass.states.get(entity_id).state == STATE_CLOSED + with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]): await hass.services.async_call( COVER_DOMAIN, @@ -205,10 +294,21 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x08 - assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert ( + cluster.request.call_args[0][2].command.name + == WCCmds.go_to_tilt_percentage.name + ) assert cluster.request.call_args[0][3] == 100 assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 100} + ) + + assert hass.states.get(entity_id).state == STATE_CLOSED + # open from UI with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -217,9 +317,17 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x00 - assert cluster.request.call_args[0][2].command.name == "up_open" + assert cluster.request.call_args[0][2].command.name == WCCmds.up_open.name assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_OPENING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 0} + ) + + assert hass.states.get(entity_id).state == STATE_OPEN + with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]): await hass.services.async_call( COVER_DOMAIN, @@ -230,10 +338,21 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x08 - assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert ( + cluster.request.call_args[0][2].command.name + == WCCmds.go_to_tilt_percentage.name + ) assert cluster.request.call_args[0][3] == 0 assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_OPENING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) + + assert hass.states.get(entity_id).state == STATE_OPEN + # set position UI with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -245,10 +364,27 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x05 - assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage" + assert ( + cluster.request.call_args[0][2].command.name + == WCCmds.go_to_lift_percentage.name + ) assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 35} + ) + + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 53} + ) + + assert hass.states.get(entity_id).state == STATE_OPEN + with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]): await hass.services.async_call( COVER_DOMAIN, @@ -259,10 +395,27 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x08 - assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert ( + cluster.request.call_args[0][2].command.name + == WCCmds.go_to_tilt_percentage.name + ) assert cluster.request.call_args[0][3] == 53 assert cluster.request.call_args[1]["expect_reply"] is True + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 35} + ) + + assert hass.states.get(entity_id).state == STATE_CLOSING + + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_lift_percentage.id: 53} + ) + + assert hass.states.get(entity_id).state == STATE_OPEN + # stop from UI with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): await hass.services.async_call( @@ -271,7 +424,7 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x02 - assert cluster.request.call_args[0][2].command.name == "stop" + assert cluster.request.call_args[0][2].command.name == WCCmds.stop.name assert cluster.request.call_args[1]["expect_reply"] is True with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]): @@ -284,11 +437,11 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x02 - assert cluster.request.call_args[0][2].command.name == "stop" + assert cluster.request.call_args[0][2].command.name == WCCmds.stop.name assert cluster.request.call_args[1]["expect_reply"] is True # test rejoin - cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 0} + cluster.PLUGGED_ATTR_READS = {WCAttrs.current_position_lift_percentage.name: 0} await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,)) assert hass.states.get(entity_id).state == STATE_OPEN @@ -303,7 +456,10 @@ async def test_cover( assert cluster.request.call_count == 1 assert cluster.request.call_args[0][0] is False assert cluster.request.call_args[0][1] == 0x08 - assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage" + assert ( + cluster.request.call_args[0][2].command.name + == WCCmds.go_to_tilt_percentage.name + ) assert cluster.request.call_args[0][3] == 100 assert cluster.request.call_args[1]["expect_reply"] is True @@ -314,11 +470,12 @@ async def test_cover_failures( """Test ZHA cover platform failure cases.""" # load up cover domain - cluster = zigpy_cover_device.endpoints.get(1).window_covering + cluster = zigpy_cover_device.endpoints[1].window_covering cluster.PLUGGED_ATTR_READS = { - "current_position_lift_percentage": None, - "current_position_tilt_percentage": 42, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift, } + update_attribute_cache(cluster) zha_device = await zha_device_joined_restored(zigpy_cover_device) entity_id = find_entity_id(Platform.COVER, zha_device, hass) @@ -331,7 +488,7 @@ async def test_cover_failures( # test update returned None prev_call_count = cluster.read_attributes.call_count await async_update_entity(hass, entity_id) - assert cluster.read_attributes.call_count == prev_call_count + 2 + assert cluster.read_attributes.call_count == prev_call_count + 1 assert hass.states.get(entity_id).state == STATE_UNAVAILABLE # allow traffic to flow through the gateway and device @@ -493,6 +650,27 @@ async def test_cover_failures( == closures.WindowCovering.ServerCommandDefs.stop.id ) + # stop from UI + with patch( + "zigpy.zcl.Cluster.request", + return_value=Default_Response( + command_id=closures.WindowCovering.ServerCommandDefs.stop.id, + status=zcl_f.Status.UNSUP_CLUSTER_COMMAND, + ), + ): + with pytest.raises(HomeAssistantError, match=r"Failed to stop cover"): + await hass.services.async_call( + COVER_DOMAIN, + SERVICE_STOP_COVER_TILT, + {"entity_id": entity_id}, + blocking=True, + ) + assert cluster.request.call_count == 1 + assert ( + cluster.request.call_args[0][1] + == closures.WindowCovering.ServerCommandDefs.stop.id + ) + async def test_shade( hass: HomeAssistant, zha_device_joined_restored, zigpy_shade_device @@ -502,8 +680,8 @@ async def test_shade( # load up cover domain zha_device = await zha_device_joined_restored(zigpy_shade_device) - cluster_on_off = zigpy_shade_device.endpoints.get(1).on_off - cluster_level = zigpy_shade_device.endpoints.get(1).level + cluster_on_off = zigpy_shade_device.endpoints[1].on_off + cluster_level = zigpy_shade_device.endpoints[1].level entity_id = find_entity_id(Platform.COVER, zha_device, hass) assert entity_id is not None @@ -700,16 +878,13 @@ async def test_cover_restore_state( hass: HomeAssistant, zha_device_restored, zigpy_cover_device ) -> None: """Ensure states are restored on startup.""" - mock_restore_cache( - hass, - ( - State( - "cover.fakemanufacturer_fakemodel_cover", - STATE_OPEN, - {ATTR_CURRENT_POSITION: 50, ATTR_CURRENT_TILT_POSITION: 42}, - ), - ), - ) + cluster = zigpy_cover_device.endpoints[1].window_covering + cluster.PLUGGED_ATTR_READS = { + WCAttrs.current_position_lift_percentage.name: 50, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift, + } + update_attribute_cache(cluster) hass.set_state(CoreState.starting) @@ -719,8 +894,8 @@ async def test_cover_restore_state( # test that the cover was created and that it is available assert hass.states.get(entity_id).state == STATE_OPEN - assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50 - assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 42 + assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 100 - 50 + assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 100 - 42 async def test_keen_vent( @@ -731,8 +906,8 @@ async def test_keen_vent( # load up cover domain zha_device = await zha_device_joined_restored(zigpy_keen_vent) - cluster_on_off = zigpy_keen_vent.endpoints.get(1).on_off - cluster_level = zigpy_keen_vent.endpoints.get(1).level + cluster_on_off = zigpy_keen_vent.endpoints[1].on_off + cluster_level = zigpy_keen_vent.endpoints[1].level entity_id = find_entity_id(Platform.COVER, zha_device, hass) assert entity_id is not None diff --git a/tests/components/zha/test_switch.py b/tests/components/zha/test_switch.py index 0db9b7dd18e..cd25d17f84f 100644 --- a/tests/components/zha/test_switch.py +++ b/tests/components/zha/test_switch.py @@ -1,5 +1,5 @@ """Test ZHA switch.""" -from unittest.mock import call, patch +from unittest.mock import AsyncMock, call, patch import pytest from zhaquirks.const import ( @@ -13,6 +13,7 @@ from zigpy.exceptions import ZigbeeException import zigpy.profiles.zha as zha from zigpy.quirks import CustomCluster, CustomDevice import zigpy.types as t +import zigpy.zcl.clusters.closures as closures import zigpy.zcl.clusters.general as general from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster import zigpy.zcl.foundation as zcl_f @@ -23,6 +24,7 @@ from homeassistant.components.zha.core.helpers import get_zha_gateway from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, Platform from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.entity_component import async_update_entity from homeassistant.setup import async_setup_component from .common import ( @@ -32,8 +34,9 @@ from .common import ( async_wait_for_updates, find_entity_id, send_attributes_report, + update_attribute_cache, ) -from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE +from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE ON = 1 OFF = 0 @@ -69,6 +72,24 @@ def zigpy_device(zigpy_device_mock): return zigpy_device_mock(endpoints) +@pytest.fixture +def zigpy_cover_device(zigpy_device_mock): + """Zigpy cover device.""" + + endpoints = { + 1: { + SIG_EP_PROFILE: zha.PROFILE_ID, + SIG_EP_TYPE: zha.DeviceType.WINDOW_COVERING_DEVICE, + SIG_EP_INPUT: [ + general.Basic.cluster_id, + closures.WindowCovering.cluster_id, + ], + SIG_EP_OUTPUT: [], + } + } + return zigpy_device_mock(endpoints) + + @pytest.fixture async def coordinator(hass, zigpy_device_mock, zha_device_joined): """Test ZHA light platform.""" @@ -136,7 +157,7 @@ async def test_switch( """Test ZHA switch platform.""" zha_device = await zha_device_joined_restored(zigpy_device) - cluster = zigpy_device.endpoints.get(1).on_off + cluster = zigpy_device.endpoints[1].on_off entity_id = find_entity_id(Platform.SWITCH, zha_device, hass) assert entity_id is not None @@ -177,6 +198,9 @@ async def test_switch( manufacturer=None, tsn=None, ) + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_ON # turn off from HA with patch( @@ -196,6 +220,9 @@ async def test_switch( manufacturer=None, tsn=None, ) + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF await async_setup_component(hass, "homeassistant", {}) @@ -338,6 +365,20 @@ async def test_zha_group_switch_entity( ) assert hass.states.get(entity_id).state == STATE_ON + # test turn off failure case + hold_off = group_cluster_on_off.off + group_cluster_on_off.off = AsyncMock(return_value=[0x01, zcl_f.Status.FAILURE]) + # turn off via UI + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert len(group_cluster_on_off.off.mock_calls) == 1 + + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_ON + group_cluster_on_off.off = hold_off + # turn off from HA with patch( "zigpy.zcl.Cluster.request", @@ -358,6 +399,20 @@ async def test_zha_group_switch_entity( ) assert hass.states.get(entity_id).state == STATE_OFF + # test turn on failure case + hold_on = group_cluster_on_off.on + group_cluster_on_off.on = AsyncMock(return_value=[0x01, zcl_f.Status.FAILURE]) + # turn on via UI + await hass.services.async_call( + SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True + ) + assert len(group_cluster_on_off.on.mock_calls) == 1 + + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF + group_cluster_on_off.on = hold_on + # test some of the group logic to make sure we key off states correctly await send_attributes_report(hass, dev1_cluster_on_off, {0: 1}) await send_attributes_report(hass, dev2_cluster_on_off, {0: 1}) @@ -391,7 +446,7 @@ async def test_switch_configurable( """Test ZHA configurable switch platform.""" zha_device = await zha_device_joined_restored(zigpy_device_tuya) - cluster = zigpy_device_tuya.endpoints.get(1).tuya_manufacturer + cluster = zigpy_device_tuya.endpoints[1].tuya_manufacturer entity_id = find_entity_id(Platform.SWITCH, zha_device, hass) assert entity_id is not None @@ -507,3 +562,125 @@ async def test_switch_configurable( # test joining a new switch to the network and HA await async_test_rejoin(hass, zigpy_device_tuya, [cluster], (0,)) + + +WCAttrs = closures.WindowCovering.AttributeDefs +WCT = closures.WindowCovering.WindowCoveringType +WCCS = closures.WindowCovering.ConfigStatus +WCM = closures.WindowCovering.WindowCoveringMode + + +async def test_cover_inversion_switch( + hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device +) -> None: + """Test ZHA cover platform.""" + + # load up cover domain + cluster = zigpy_cover_device.endpoints[1].window_covering + cluster.PLUGGED_ATTR_READS = { + WCAttrs.current_position_lift_percentage.name: 65, + WCAttrs.current_position_tilt_percentage.name: 42, + WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift, + WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed), + WCAttrs.window_covering_mode.name: WCM(WCM.LEDs_display_feedback), + } + update_attribute_cache(cluster) + zha_device = await zha_device_joined_restored(zigpy_cover_device) + assert ( + not zha_device.endpoints[1] + .all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"] + .inverted + ) + assert cluster.read_attributes.call_count == 3 + assert ( + WCAttrs.current_position_lift_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + assert ( + WCAttrs.current_position_tilt_percentage.name + in cluster.read_attributes.call_args[0][0] + ) + + entity_id = find_entity_id(Platform.SWITCH, zha_device, hass) + assert entity_id is not None + + await async_enable_traffic(hass, [zha_device], enabled=False) + # test that the cover was created and that it is unavailable + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_UNAVAILABLE + + # allow traffic to flow through the gateway and device + await async_enable_traffic(hass, [zha_device]) + await hass.async_block_till_done() + + # test update + prev_call_count = cluster.read_attributes.call_count + await async_update_entity(hass, entity_id) + assert cluster.read_attributes.call_count == prev_call_count + 1 + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF + + # test to see the state remains after tilting to 0% + await send_attributes_report( + hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0} + ) + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF + + with patch( + "zigpy.zcl.Cluster.write_attributes", return_value=[0x1, zcl_f.Status.SUCCESS] + ): + cluster.PLUGGED_ATTR_READS = { + WCAttrs.config_status.name: WCCS.Operational + | WCCS.Open_up_commands_reversed, + } + # turn on from UI + await hass.services.async_call( + SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True + ) + assert cluster.write_attributes.call_count == 1 + assert cluster.write_attributes.call_args_list[0] == call( + { + WCAttrs.window_covering_mode.name: WCM.Motor_direction_reversed + | WCM.LEDs_display_feedback + }, + manufacturer=None, + ) + + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_ON + + cluster.write_attributes.reset_mock() + + # turn off from UI + cluster.PLUGGED_ATTR_READS = { + WCAttrs.config_status.name: WCCS.Operational, + } + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert cluster.write_attributes.call_count == 1 + assert cluster.write_attributes.call_args_list[0] == call( + {WCAttrs.window_covering_mode.name: WCM.LEDs_display_feedback}, + manufacturer=None, + ) + + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF + + cluster.write_attributes.reset_mock() + + # test that sending the command again does not result in a write + await hass.services.async_call( + SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True + ) + assert cluster.write_attributes.call_count == 0 + + state = hass.states.get(entity_id) + assert state + assert state.state == STATE_OFF