Clean up ZHA cover entity and add support for inverting cover entities derived from the window covering cluster (#108238)

This commit is contained in:
David F. Mulcahey 2024-01-30 07:59:00 -05:00 committed by GitHub
parent 14766b6992
commit 92795fecf5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 781 additions and 146 deletions

View File

@ -424,10 +424,18 @@ class ClusterHandler(LogMixin):
@callback
def attribute_updated(self, attrid: int, value: Any, _: Any) -> None:
"""Handle attribute updates on this cluster."""
attr_name = self._get_attribute_name(attrid)
self.debug(
"cluster_handler[%s] attribute_updated - cluster[%s] attr[%s] value[%s]",
self.name,
self.cluster.name,
attr_name,
value,
)
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
attrid,
self._get_attribute_name(attrid),
attr_name,
value,
)

View File

@ -1,10 +1,10 @@
"""Closures cluster handlers module for Zigbee Home Automation."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import Any
import zigpy.zcl
from zigpy.zcl.clusters.closures import DoorLock, Shade, WindowCovering
import zigpy.types as t
from zigpy.zcl.clusters.closures import ConfigStatus, DoorLock, Shade, WindowCovering
from homeassistant.core import callback
@ -12,9 +12,6 @@ from .. import registries
from ..const import REPORT_CONFIG_IMMEDIATE, SIGNAL_ATTR_UPDATED
from . import AttrReportConfig, ClientClusterHandler, ClusterHandler
if TYPE_CHECKING:
from ..endpoint import Endpoint
@registries.ZIGBEE_CLUSTER_HANDLER_REGISTRY.register(DoorLock.cluster_id)
class DoorLockClusterHandler(ClusterHandler):
@ -53,7 +50,7 @@ class DoorLockClusterHandler(ClusterHandler):
command_name = self._cluster.client_commands[command_id].name
if command_name == "operation_event_notification":
if command_name == DoorLock.ClientCommandDefs.operation_event_notification.name:
self.zha_send_event(
command_name,
{
@ -138,62 +135,140 @@ class WindowCoveringClientClusterHandler(ClientClusterHandler):
class WindowCoveringClusterHandler(ClusterHandler):
"""Window cluster handler."""
_value_attribute_lift = (
WindowCovering.AttributeDefs.current_position_lift_percentage.id
)
_value_attribute_tilt = (
WindowCovering.AttributeDefs.current_position_tilt_percentage.id
)
REPORT_CONFIG = (
AttrReportConfig(
attr="current_position_lift_percentage", config=REPORT_CONFIG_IMMEDIATE
attr=WindowCovering.AttributeDefs.current_position_lift_percentage.name,
config=REPORT_CONFIG_IMMEDIATE,
),
AttrReportConfig(
attr="current_position_tilt_percentage", config=REPORT_CONFIG_IMMEDIATE
attr=WindowCovering.AttributeDefs.current_position_tilt_percentage.name,
config=REPORT_CONFIG_IMMEDIATE,
),
)
def __init__(self, cluster: zigpy.zcl.Cluster, endpoint: Endpoint) -> None:
"""Initialize WindowCovering cluster handler."""
super().__init__(cluster, endpoint)
if self.cluster.endpoint.model == "lumi.curtain.agl001":
self.ZCL_INIT_ATTRS = self.ZCL_INIT_ATTRS.copy()
self.ZCL_INIT_ATTRS["window_covering_mode"] = True
ZCL_INIT_ATTRS = {
WindowCovering.AttributeDefs.window_covering_type.name: True,
WindowCovering.AttributeDefs.window_covering_mode.name: True,
WindowCovering.AttributeDefs.config_status.name: True,
WindowCovering.AttributeDefs.installed_closed_limit_lift.name: True,
WindowCovering.AttributeDefs.installed_closed_limit_tilt.name: True,
WindowCovering.AttributeDefs.installed_open_limit_lift.name: True,
WindowCovering.AttributeDefs.installed_open_limit_tilt.name: True,
}
async def async_update(self):
"""Retrieve latest state."""
result = await self.get_attribute_value(
"current_position_lift_percentage", from_cache=False
results = await self.get_attributes(
[
WindowCovering.AttributeDefs.current_position_lift_percentage.name,
WindowCovering.AttributeDefs.current_position_tilt_percentage.name,
],
from_cache=False,
only_cache=False,
)
self.debug("read current position: %s", result)
if result is not None:
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
self._value_attribute_lift,
"current_position_lift_percentage",
result,
self.debug(
"read current_position_lift_percentage and current_position_tilt_percentage - results: %s",
results,
)
if (
results
and results.get(
WindowCovering.AttributeDefs.current_position_lift_percentage.name
)
result = await self.get_attribute_value(
"current_position_tilt_percentage", from_cache=False
)
self.debug("read current tilt position: %s", result)
if result is not None:
is not None
):
# the 100 - value is because we need to invert the value before giving it to the entity
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
self._value_attribute_tilt,
"current_position_tilt_percentage",
result,
WindowCovering.AttributeDefs.current_position_lift_percentage.id,
WindowCovering.AttributeDefs.current_position_lift_percentage.name,
100
- results.get(
WindowCovering.AttributeDefs.current_position_lift_percentage.name
),
)
if (
results
and results.get(
WindowCovering.AttributeDefs.current_position_tilt_percentage.name
)
is not None
):
# the 100 - value is because we need to invert the value before giving it to the entity
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}",
WindowCovering.AttributeDefs.current_position_tilt_percentage.id,
WindowCovering.AttributeDefs.current_position_tilt_percentage.name,
100
- results.get(
WindowCovering.AttributeDefs.current_position_tilt_percentage.name
),
)
@callback
def attribute_updated(self, attrid: int, value: Any, _: Any) -> None:
"""Handle attribute update from window_covering cluster."""
attr_name = self._get_attribute_name(attrid)
self.debug(
"Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value
@property
def inverted(self):
"""Return true if the window covering is inverted."""
config_status = self.cluster.get(
WindowCovering.AttributeDefs.config_status.name
)
return (
config_status is not None
and ConfigStatus.Open_up_commands_reversed in ConfigStatus(config_status)
)
@property
def current_position_lift_percentage(self) -> t.uint16_t | None:
"""Return the current lift percentage of the window covering."""
lift_percentage = self.cluster.get(
WindowCovering.AttributeDefs.current_position_lift_percentage.name
)
if lift_percentage is not None:
# the 100 - value is because we need to invert the value before giving it to the entity
lift_percentage = 100 - lift_percentage
return lift_percentage
@property
def current_position_tilt_percentage(self) -> t.uint16_t | None:
"""Return the current tilt percentage of the window covering."""
tilt_percentage = self.cluster.get(
WindowCovering.AttributeDefs.current_position_tilt_percentage.name
)
if tilt_percentage is not None:
# the 100 - value is because we need to invert the value before giving it to the entity
tilt_percentage = 100 - tilt_percentage
return tilt_percentage
@property
def installed_open_limit_lift(self) -> t.uint16_t | None:
"""Return the installed open lift limit of the window covering."""
return self.cluster.get(
WindowCovering.AttributeDefs.installed_open_limit_lift.name
)
@property
def installed_closed_limit_lift(self) -> t.uint16_t | None:
"""Return the installed closed lift limit of the window covering."""
return self.cluster.get(
WindowCovering.AttributeDefs.installed_closed_limit_lift.name
)
@property
def installed_open_limit_tilt(self) -> t.uint16_t | None:
"""Return the installed open tilt limit of the window covering."""
return self.cluster.get(
WindowCovering.AttributeDefs.installed_open_limit_tilt.name
)
@property
def installed_closed_limit_tilt(self) -> t.uint16_t | None:
"""Return the installed closed tilt limit of the window covering."""
return self.cluster.get(
WindowCovering.AttributeDefs.installed_closed_limit_tilt.name
)
@property
def window_covering_type(self) -> WindowCovering.WindowCoveringType:
"""Return the window covering type."""
return WindowCovering.WindowCoveringType(
self.cluster.get(WindowCovering.AttributeDefs.window_covering_type.name)
)
if attrid in (self._value_attribute_lift, self._value_attribute_tilt):
self.async_send_signal(
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, attr_name, value
)

View File

@ -4,8 +4,9 @@ from __future__ import annotations
import asyncio
import functools
import logging
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast
from zigpy.zcl.clusters.closures import WindowCovering as WindowCoveringCluster
from zigpy.zcl.foundation import Status
from homeassistant.components.cover import (
@ -14,6 +15,7 @@ from homeassistant.components.cover import (
ATTR_TILT_POSITION,
CoverDeviceClass,
CoverEntity,
CoverEntityFeature,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@ -29,6 +31,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .core import discovery
from .core.cluster_handlers.closures import WindowCoveringClusterHandler
from .core.const import (
CLUSTER_HANDLER_COVER,
CLUSTER_HANDLER_LEVEL,
@ -70,40 +73,145 @@ async def async_setup_entry(
config_entry.async_on_unload(unsub)
WCAttrs = WindowCoveringCluster.AttributeDefs
WCT = WindowCoveringCluster.WindowCoveringType
WCCS = WindowCoveringCluster.ConfigStatus
ZCL_TO_COVER_DEVICE_CLASS = {
WCT.Awning: CoverDeviceClass.AWNING,
WCT.Drapery: CoverDeviceClass.CURTAIN,
WCT.Projector_screen: CoverDeviceClass.SHADE,
WCT.Rollershade: CoverDeviceClass.SHADE,
WCT.Rollershade_two_motors: CoverDeviceClass.SHADE,
WCT.Rollershade_exterior: CoverDeviceClass.SHADE,
WCT.Rollershade_exterior_two_motors: CoverDeviceClass.SHADE,
WCT.Shutter: CoverDeviceClass.SHUTTER,
WCT.Tilt_blind_tilt_only: CoverDeviceClass.BLIND,
WCT.Tilt_blind_tilt_and_lift: CoverDeviceClass.BLIND,
}
@MULTI_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER)
class ZhaCover(ZhaEntity, CoverEntity):
"""Representation of a ZHA cover."""
_attr_translation_key: str = "cover"
def __init__(self, unique_id, zha_device, cluster_handlers, **kwargs):
"""Init this sensor."""
def __init__(
self,
unique_id: str,
zha_device: ZHADevice,
cluster_handlers: list[ClusterHandler],
**kwargs: Any,
) -> None:
"""Init this cover."""
super().__init__(unique_id, zha_device, cluster_handlers, **kwargs)
self._cover_cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER)
self._current_position = None
self._tilt_position = None
cluster_handler = self.cluster_handlers.get(CLUSTER_HANDLER_COVER)
assert cluster_handler
self._cover_cluster_handler: WindowCoveringClusterHandler = cast(
WindowCoveringClusterHandler, cluster_handler
)
if self._cover_cluster_handler.window_covering_type:
self._attr_device_class: CoverDeviceClass | None = (
ZCL_TO_COVER_DEVICE_CLASS.get(
self._cover_cluster_handler.window_covering_type
)
)
self._attr_supported_features: CoverEntityFeature = (
self._determine_supported_features()
)
self._target_lift_position: int | None = None
self._target_tilt_position: int | None = None
self._determine_initial_state()
def _determine_supported_features(self) -> CoverEntityFeature:
"""Determine the supported cover features."""
supported_features: CoverEntityFeature = (
CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
| CoverEntityFeature.STOP
| CoverEntityFeature.SET_POSITION
)
if (
self._cover_cluster_handler.window_covering_type
and self._cover_cluster_handler.window_covering_type
in (
WCT.Shutter,
WCT.Tilt_blind_tilt_only,
WCT.Tilt_blind_tilt_and_lift,
)
):
supported_features |= CoverEntityFeature.SET_TILT_POSITION
supported_features |= CoverEntityFeature.OPEN_TILT
supported_features |= CoverEntityFeature.CLOSE_TILT
supported_features |= CoverEntityFeature.STOP_TILT
return supported_features
def _determine_initial_state(self) -> None:
"""Determine the initial state of the cover."""
if (
self._cover_cluster_handler.window_covering_type
and self._cover_cluster_handler.window_covering_type
in (
WCT.Shutter,
WCT.Tilt_blind_tilt_only,
WCT.Tilt_blind_tilt_and_lift,
)
):
self._determine_state(
self.current_cover_tilt_position, is_lift_update=False
)
if (
self._cover_cluster_handler.window_covering_type
== WCT.Tilt_blind_tilt_and_lift
):
state = self._state
self._determine_state(self.current_cover_position)
if state == STATE_OPEN and self._state == STATE_CLOSED:
# let the tilt state override the lift state
self._state = STATE_OPEN
else:
self._determine_state(self.current_cover_position)
def _determine_state(self, position_or_tilt, is_lift_update=True) -> None:
"""Determine the state of the cover.
In HA None is unknown, 0 is closed, 100 is fully open.
In ZCL 0 is fully open, 100 is fully closed.
Keep in mind the values have already been flipped to match HA
in the WindowCovering cluster handler
"""
if is_lift_update:
target = self._target_lift_position
current = self.current_cover_position
else:
target = self._target_tilt_position
current = self.current_cover_tilt_position
if position_or_tilt == 100:
self._state = STATE_CLOSED
return
if target is not None and target != current:
# we are mid transition and shouldn't update the state
return
self._state = STATE_OPEN
async def async_added_to_hass(self) -> None:
"""Run when about to be added to hass."""
"""Run when the cover entity is about to be added to hass."""
await super().async_added_to_hass()
self.async_accept_signal(
self._cover_cluster_handler, SIGNAL_ATTR_UPDATED, self.async_set_position
self._cover_cluster_handler, SIGNAL_ATTR_UPDATED, self.zcl_attribute_updated
)
@callback
def async_restore_last_state(self, last_state):
"""Restore previous state."""
self._state = last_state.state
if "current_position" in last_state.attributes:
self._current_position = last_state.attributes["current_position"]
if "current_tilt_position" in last_state.attributes:
self._tilt_position = last_state.attributes[
"current_tilt_position"
] # first allocation activate tilt
@property
def is_closed(self) -> bool | None:
"""Return if the cover is closed."""
"""Return True if the cover is closed.
In HA None is unknown, 0 is closed, 100 is fully open.
In ZCL 0 is fully open, 100 is fully closed.
Keep in mind the values have already been flipped to match HA
in the WindowCovering cluster handler
"""
if self.current_cover_position is None:
return None
return self.current_cover_position == 0
@ -122,39 +230,45 @@ class ZhaCover(ZhaEntity, CoverEntity):
def current_cover_position(self) -> int | None:
"""Return the current position of ZHA cover.
None is unknown, 0 is closed, 100 is fully open.
In HA None is unknown, 0 is closed, 100 is fully open.
In ZCL 0 is fully open, 100 is fully closed.
Keep in mind the values have already been flipped to match HA
in the WindowCovering cluster handler
"""
return self._current_position
return self._cover_cluster_handler.current_position_lift_percentage
@property
def current_cover_tilt_position(self) -> int | None:
"""Return the current tilt position of the cover."""
return self._tilt_position
return self._cover_cluster_handler.current_position_tilt_percentage
@callback
def async_set_position(self, attr_id, attr_name, value):
def zcl_attribute_updated(self, attr_id, attr_name, value):
"""Handle position update from cluster handler."""
_LOGGER.debug("setting position: %s %s %s", attr_id, attr_name, value)
if attr_name == "current_position_lift_percentage":
self._current_position = 100 - value
elif attr_name == "current_position_tilt_percentage":
self._tilt_position = 100 - value
if self._current_position == 0:
self._state = STATE_CLOSED
elif self._current_position == 100:
self._state = STATE_OPEN
if attr_id in (
WCAttrs.current_position_lift_percentage.id,
WCAttrs.current_position_tilt_percentage.id,
):
value = (
self.current_cover_position
if attr_id == WCAttrs.current_position_lift_percentage.id
else self.current_cover_tilt_position
)
self._determine_state(
value,
is_lift_update=attr_id == WCAttrs.current_position_lift_percentage.id,
)
self.async_write_ha_state()
@callback
def async_update_state(self, state):
"""Handle state update from cluster handler."""
_LOGGER.debug("state=%s", state)
"""Handle state update from HA operations below."""
_LOGGER.debug("async_update_state=%s", state)
self._state = state
self.async_write_ha_state()
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the window cover."""
"""Open the cover."""
res = await self._cover_cluster_handler.up_open()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover: {res[1]}")
@ -162,13 +276,14 @@ class ZhaCover(ZhaEntity, CoverEntity):
async def async_open_cover_tilt(self, **kwargs: Any) -> None:
"""Open the cover tilt."""
# 0 is open in ZCL
res = await self._cover_cluster_handler.go_to_tilt_percentage(0)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to open cover tilt: {res[1]}")
self.async_update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the window cover."""
"""Close the cover."""
res = await self._cover_cluster_handler.down_close()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover: {res[1]}")
@ -176,42 +291,63 @@ class ZhaCover(ZhaEntity, CoverEntity):
async def async_close_cover_tilt(self, **kwargs: Any) -> None:
"""Close the cover tilt."""
# 100 is closed in ZCL
res = await self._cover_cluster_handler.go_to_tilt_percentage(100)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to close cover tilt: {res[1]}")
self.async_update_state(STATE_CLOSING)
async def async_set_cover_position(self, **kwargs: Any) -> None:
"""Move the roller shutter to a specific position."""
new_pos = kwargs[ATTR_POSITION]
res = await self._cover_cluster_handler.go_to_lift_percentage(100 - new_pos)
"""Move the cover to a specific position."""
self._target_lift_position = kwargs[ATTR_POSITION]
assert self._target_lift_position is not None
assert self.current_cover_position is not None
# the 100 - value is because we need to invert the value before giving it to ZCL
res = await self._cover_cluster_handler.go_to_lift_percentage(
100 - self._target_lift_position
)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover position: {res[1]}")
self.async_update_state(
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
STATE_CLOSING
if self._target_lift_position < self.current_cover_position
else STATE_OPENING
)
async def async_set_cover_tilt_position(self, **kwargs: Any) -> None:
"""Move the cover til to a specific position."""
new_pos = kwargs[ATTR_TILT_POSITION]
res = await self._cover_cluster_handler.go_to_tilt_percentage(100 - new_pos)
"""Move the cover tilt to a specific position."""
self._target_tilt_position = kwargs[ATTR_TILT_POSITION]
assert self._target_tilt_position is not None
assert self.current_cover_tilt_position is not None
# the 100 - value is because we need to invert the value before giving it to ZCL
res = await self._cover_cluster_handler.go_to_tilt_percentage(
100 - self._target_tilt_position
)
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to set cover tilt position: {res[1]}")
self.async_update_state(
STATE_CLOSING if new_pos < self._tilt_position else STATE_OPENING
STATE_CLOSING
if self._target_tilt_position < self.current_cover_tilt_position
else STATE_OPENING
)
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the window cover."""
"""Stop the cover."""
res = await self._cover_cluster_handler.stop()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to stop cover: {res[1]}")
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
self._target_lift_position = self.current_cover_position
self._determine_state(self.current_cover_position)
self.async_write_ha_state()
async def async_stop_cover_tilt(self, **kwargs: Any) -> None:
"""Stop the cover tilt."""
await self.async_stop_cover()
res = await self._cover_cluster_handler.stop()
if res[1] is not Status.SUCCESS:
raise HomeAssistantError(f"Failed to stop cover: {res[1]}")
self._target_tilt_position = self.current_cover_tilt_position
self._determine_state(self.current_cover_tilt_position, is_lift_update=False)
self.async_write_ha_state()
@MULTI_MATCH(

View File

@ -905,6 +905,9 @@
"invert_switch": {
"name": "Invert switch"
},
"inverted": {
"name": "Inverted"
},
"smart_bulb_mode": {
"name": "Smart bulb mode"
},

View File

@ -6,6 +6,7 @@ import logging
from typing import TYPE_CHECKING, Any, Self
from zhaquirks.quirk_ids import TUYA_PLUG_ONOFF
from zigpy.zcl.clusters.closures import ConfigStatus, WindowCovering, WindowCoveringMode
from zigpy.zcl.clusters.general import OnOff
from zigpy.zcl.foundation import Status
@ -19,6 +20,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback
from .core import discovery
from .core.const import (
CLUSTER_HANDLER_BASIC,
CLUSTER_HANDLER_COVER,
CLUSTER_HANDLER_INOVELLI,
CLUSTER_HANDLER_ON_OFF,
SIGNAL_ADD_ENTITIES,
@ -588,3 +590,62 @@ class AqaraBuzzerManualAlarm(ZHASwitchConfigurationEntity):
_attribute_name = "buzzer_manual_alarm"
_attr_translation_key = "buzzer_manual_alarm"
_attr_icon: str = "mdi:bullhorn"
@CONFIG_DIAGNOSTIC_MATCH(cluster_handler_names=CLUSTER_HANDLER_COVER)
class WindowCoveringInversionSwitch(ZHASwitchConfigurationEntity):
"""Representation of a switch that controls inversion for window covering devices.
This is necessary because this cluster uses 2 attributes to control inversion.
"""
_unique_id_suffix = "inverted"
_attribute_name = WindowCovering.AttributeDefs.config_status.name
_attr_translation_key = "inverted"
_attr_icon: str = "mdi:arrow-up-down"
@property
def is_on(self) -> bool:
"""Return if the switch is on based on the statemachine."""
config_status = ConfigStatus(
self._cluster_handler.cluster.get(self._attribute_name)
)
return ConfigStatus.Open_up_commands_reversed in config_status
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the entity on."""
await self._async_on_off(True)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
await self._async_on_off(False)
async def async_update(self) -> None:
"""Attempt to retrieve the state of the entity."""
self.debug("Polling current state")
await self._cluster_handler.get_attributes(
[
self._attribute_name,
WindowCovering.AttributeDefs.window_covering_mode.name,
],
from_cache=False,
only_cache=False,
)
self.async_write_ha_state()
async def _async_on_off(self, invert: bool) -> None:
"""Turn the entity on or off."""
name: str = WindowCovering.AttributeDefs.window_covering_mode.name
current_mode: WindowCoveringMode = WindowCoveringMode(
self._cluster_handler.cluster.get(name)
)
send_command: bool = False
if invert and WindowCoveringMode.Motor_direction_reversed not in current_mode:
current_mode |= WindowCoveringMode.Motor_direction_reversed
send_command = True
elif not invert and WindowCoveringMode.Motor_direction_reversed in current_mode:
current_mode &= ~WindowCoveringMode.Motor_direction_reversed
send_command = True
if send_command:
await self._cluster_handler.write_attributes_safe({name: current_mode})
await self.async_update()

View File

@ -28,7 +28,9 @@ from homeassistant.components.zha.core.const import ZHA_EVENT
from homeassistant.const import (
ATTR_COMMAND,
STATE_CLOSED,
STATE_CLOSING,
STATE_OPEN,
STATE_OPENING,
STATE_UNAVAILABLE,
Platform,
)
@ -42,6 +44,7 @@ from .common import (
find_entity_id,
make_zcl_header,
send_attributes_report,
update_attribute_cache,
)
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
@ -131,21 +134,40 @@ def zigpy_keen_vent(zigpy_device_mock):
)
async def test_cover(
WCAttrs = closures.WindowCovering.AttributeDefs
WCCmds = closures.WindowCovering.ServerCommandDefs
WCT = closures.WindowCovering.WindowCoveringType
WCCS = closures.WindowCovering.ConfigStatus
async def test_cover_non_tilt_initial_state(
hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device
) -> None:
"""Test ZHA cover platform."""
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
"current_position_lift_percentage": 65,
"current_position_tilt_percentage": 42,
WCAttrs.current_position_lift_percentage.name: 0,
WCAttrs.window_covering_type.name: WCT.Drapery,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert cluster.read_attributes.call_count == 1
assert "current_position_lift_percentage" in cluster.read_attributes.call_args[0][0]
assert "current_position_tilt_percentage" in cluster.read_attributes.call_args[0][0]
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)
assert cluster.read_attributes.call_count == 3
assert (
WCAttrs.current_position_lift_percentage.name
in cluster.read_attributes.call_args[0][0]
)
assert (
WCAttrs.current_position_tilt_percentage.name
in cluster.read_attributes.call_args[0][0]
)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
@ -161,27 +183,86 @@ async def test_cover(
# test update
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 2
assert cluster.read_attributes.call_count == prev_call_count + 1
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 35
assert state.attributes[ATTR_CURRENT_POSITION] == 100
async def test_cover(
hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device
) -> None:
"""Test ZHA cover platform."""
# load up cover domain
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 0,
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)
assert cluster.read_attributes.call_count == 3
assert (
WCAttrs.current_position_lift_percentage.name
in cluster.read_attributes.call_args[0][0]
)
assert (
WCAttrs.current_position_tilt_percentage.name
in cluster.read_attributes.call_args[0][0]
)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test update
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 1
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OPEN
assert state.attributes[ATTR_CURRENT_POSITION] == 100
assert state.attributes[ATTR_CURRENT_TILT_POSITION] == 58
# test that the state has changed from unavailable to off
await send_attributes_report(hass, cluster, {0: 0, 8: 100, 1: 1})
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 100}
)
assert hass.states.get(entity_id).state == STATE_CLOSED
# test to see if it opens
await send_attributes_report(hass, cluster, {0: 1, 8: 0, 1: 100})
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 0}
)
assert hass.states.get(entity_id).state == STATE_OPEN
# test that the state remains after tilting to 100%
await send_attributes_report(hass, cluster, {0: 0, 9: 100, 1: 1})
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 100}
)
assert hass.states.get(entity_id).state == STATE_OPEN
# test to see the state remains after tilting to 0%
await send_attributes_report(hass, cluster, {0: 1, 9: 0, 1: 100})
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0}
)
assert hass.states.get(entity_id).state == STATE_OPEN
# close from UI
@ -192,9 +273,17 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x01
assert cluster.request.call_args[0][2].command.name == "down_close"
assert cluster.request.call_args[0][2].command.name == WCCmds.down_close.name
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 100}
)
assert hass.states.get(entity_id).state == STATE_CLOSED
with patch("zigpy.zcl.Cluster.request", return_value=[0x1, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
@ -205,10 +294,21 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert (
cluster.request.call_args[0][2].command.name
== WCCmds.go_to_tilt_percentage.name
)
assert cluster.request.call_args[0][3] == 100
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 100}
)
assert hass.states.get(entity_id).state == STATE_CLOSED
# open from UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -217,9 +317,17 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x00
assert cluster.request.call_args[0][2].command.name == "up_open"
assert cluster.request.call_args[0][2].command.name == WCCmds.up_open.name
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_OPENING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 0}
)
assert hass.states.get(entity_id).state == STATE_OPEN
with patch("zigpy.zcl.Cluster.request", return_value=[0x0, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
@ -230,10 +338,21 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert (
cluster.request.call_args[0][2].command.name
== WCCmds.go_to_tilt_percentage.name
)
assert cluster.request.call_args[0][3] == 0
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_OPENING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0}
)
assert hass.states.get(entity_id).state == STATE_OPEN
# set position UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -245,10 +364,27 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x05
assert cluster.request.call_args[0][2].command.name == "go_to_lift_percentage"
assert (
cluster.request.call_args[0][2].command.name
== WCCmds.go_to_lift_percentage.name
)
assert cluster.request.call_args[0][3] == 53
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 35}
)
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 53}
)
assert hass.states.get(entity_id).state == STATE_OPEN
with patch("zigpy.zcl.Cluster.request", return_value=[0x5, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
COVER_DOMAIN,
@ -259,10 +395,27 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert (
cluster.request.call_args[0][2].command.name
== WCCmds.go_to_tilt_percentage.name
)
assert cluster.request.call_args[0][3] == 53
assert cluster.request.call_args[1]["expect_reply"] is True
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 35}
)
assert hass.states.get(entity_id).state == STATE_CLOSING
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_lift_percentage.id: 53}
)
assert hass.states.get(entity_id).state == STATE_OPEN
# stop from UI
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
await hass.services.async_call(
@ -271,7 +424,7 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x02
assert cluster.request.call_args[0][2].command.name == "stop"
assert cluster.request.call_args[0][2].command.name == WCCmds.stop.name
assert cluster.request.call_args[1]["expect_reply"] is True
with patch("zigpy.zcl.Cluster.request", return_value=[0x2, zcl_f.Status.SUCCESS]):
@ -284,11 +437,11 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x02
assert cluster.request.call_args[0][2].command.name == "stop"
assert cluster.request.call_args[0][2].command.name == WCCmds.stop.name
assert cluster.request.call_args[1]["expect_reply"] is True
# test rejoin
cluster.PLUGGED_ATTR_READS = {"current_position_lift_percentage": 0}
cluster.PLUGGED_ATTR_READS = {WCAttrs.current_position_lift_percentage.name: 0}
await async_test_rejoin(hass, zigpy_cover_device, [cluster], (1,))
assert hass.states.get(entity_id).state == STATE_OPEN
@ -303,7 +456,10 @@ async def test_cover(
assert cluster.request.call_count == 1
assert cluster.request.call_args[0][0] is False
assert cluster.request.call_args[0][1] == 0x08
assert cluster.request.call_args[0][2].command.name == "go_to_tilt_percentage"
assert (
cluster.request.call_args[0][2].command.name
== WCCmds.go_to_tilt_percentage.name
)
assert cluster.request.call_args[0][3] == 100
assert cluster.request.call_args[1]["expect_reply"] is True
@ -314,11 +470,12 @@ async def test_cover_failures(
"""Test ZHA cover platform failure cases."""
# load up cover domain
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
"current_position_lift_percentage": None,
"current_position_tilt_percentage": 42,
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
}
update_attribute_cache(cluster)
zha_device = await zha_device_joined_restored(zigpy_cover_device)
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
@ -331,7 +488,7 @@ async def test_cover_failures(
# test update returned None
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 2
assert cluster.read_attributes.call_count == prev_call_count + 1
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
@ -493,6 +650,27 @@ async def test_cover_failures(
== closures.WindowCovering.ServerCommandDefs.stop.id
)
# stop from UI
with patch(
"zigpy.zcl.Cluster.request",
return_value=Default_Response(
command_id=closures.WindowCovering.ServerCommandDefs.stop.id,
status=zcl_f.Status.UNSUP_CLUSTER_COMMAND,
),
):
with pytest.raises(HomeAssistantError, match=r"Failed to stop cover"):
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_STOP_COVER_TILT,
{"entity_id": entity_id},
blocking=True,
)
assert cluster.request.call_count == 1
assert (
cluster.request.call_args[0][1]
== closures.WindowCovering.ServerCommandDefs.stop.id
)
async def test_shade(
hass: HomeAssistant, zha_device_joined_restored, zigpy_shade_device
@ -502,8 +680,8 @@ async def test_shade(
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_shade_device)
cluster_on_off = zigpy_shade_device.endpoints.get(1).on_off
cluster_level = zigpy_shade_device.endpoints.get(1).level
cluster_on_off = zigpy_shade_device.endpoints[1].on_off
cluster_level = zigpy_shade_device.endpoints[1].level
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None
@ -700,16 +878,13 @@ async def test_cover_restore_state(
hass: HomeAssistant, zha_device_restored, zigpy_cover_device
) -> None:
"""Ensure states are restored on startup."""
mock_restore_cache(
hass,
(
State(
"cover.fakemanufacturer_fakemodel_cover",
STATE_OPEN,
{ATTR_CURRENT_POSITION: 50, ATTR_CURRENT_TILT_POSITION: 42},
),
),
)
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 50,
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
}
update_attribute_cache(cluster)
hass.set_state(CoreState.starting)
@ -719,8 +894,8 @@ async def test_cover_restore_state(
# test that the cover was created and that it is available
assert hass.states.get(entity_id).state == STATE_OPEN
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 50
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 42
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_POSITION] == 100 - 50
assert hass.states.get(entity_id).attributes[ATTR_CURRENT_TILT_POSITION] == 100 - 42
async def test_keen_vent(
@ -731,8 +906,8 @@ async def test_keen_vent(
# load up cover domain
zha_device = await zha_device_joined_restored(zigpy_keen_vent)
cluster_on_off = zigpy_keen_vent.endpoints.get(1).on_off
cluster_level = zigpy_keen_vent.endpoints.get(1).level
cluster_on_off = zigpy_keen_vent.endpoints[1].on_off
cluster_level = zigpy_keen_vent.endpoints[1].level
entity_id = find_entity_id(Platform.COVER, zha_device, hass)
assert entity_id is not None

View File

@ -1,5 +1,5 @@
"""Test ZHA switch."""
from unittest.mock import call, patch
from unittest.mock import AsyncMock, call, patch
import pytest
from zhaquirks.const import (
@ -13,6 +13,7 @@ from zigpy.exceptions import ZigbeeException
import zigpy.profiles.zha as zha
from zigpy.quirks import CustomCluster, CustomDevice
import zigpy.types as t
import zigpy.zcl.clusters.closures as closures
import zigpy.zcl.clusters.general as general
from zigpy.zcl.clusters.manufacturer_specific import ManufacturerSpecificCluster
import zigpy.zcl.foundation as zcl_f
@ -23,6 +24,7 @@ from homeassistant.components.zha.core.helpers import get_zha_gateway
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_component import async_update_entity
from homeassistant.setup import async_setup_component
from .common import (
@ -32,8 +34,9 @@ from .common import (
async_wait_for_updates,
find_entity_id,
send_attributes_report,
update_attribute_cache,
)
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_TYPE
from .conftest import SIG_EP_INPUT, SIG_EP_OUTPUT, SIG_EP_PROFILE, SIG_EP_TYPE
ON = 1
OFF = 0
@ -69,6 +72,24 @@ def zigpy_device(zigpy_device_mock):
return zigpy_device_mock(endpoints)
@pytest.fixture
def zigpy_cover_device(zigpy_device_mock):
"""Zigpy cover device."""
endpoints = {
1: {
SIG_EP_PROFILE: zha.PROFILE_ID,
SIG_EP_TYPE: zha.DeviceType.WINDOW_COVERING_DEVICE,
SIG_EP_INPUT: [
general.Basic.cluster_id,
closures.WindowCovering.cluster_id,
],
SIG_EP_OUTPUT: [],
}
}
return zigpy_device_mock(endpoints)
@pytest.fixture
async def coordinator(hass, zigpy_device_mock, zha_device_joined):
"""Test ZHA light platform."""
@ -136,7 +157,7 @@ async def test_switch(
"""Test ZHA switch platform."""
zha_device = await zha_device_joined_restored(zigpy_device)
cluster = zigpy_device.endpoints.get(1).on_off
cluster = zigpy_device.endpoints[1].on_off
entity_id = find_entity_id(Platform.SWITCH, zha_device, hass)
assert entity_id is not None
@ -177,6 +198,9 @@ async def test_switch(
manufacturer=None,
tsn=None,
)
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_ON
# turn off from HA
with patch(
@ -196,6 +220,9 @@ async def test_switch(
manufacturer=None,
tsn=None,
)
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF
await async_setup_component(hass, "homeassistant", {})
@ -338,6 +365,20 @@ async def test_zha_group_switch_entity(
)
assert hass.states.get(entity_id).state == STATE_ON
# test turn off failure case
hold_off = group_cluster_on_off.off
group_cluster_on_off.off = AsyncMock(return_value=[0x01, zcl_f.Status.FAILURE])
# turn off via UI
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert len(group_cluster_on_off.off.mock_calls) == 1
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_ON
group_cluster_on_off.off = hold_off
# turn off from HA
with patch(
"zigpy.zcl.Cluster.request",
@ -358,6 +399,20 @@ async def test_zha_group_switch_entity(
)
assert hass.states.get(entity_id).state == STATE_OFF
# test turn on failure case
hold_on = group_cluster_on_off.on
group_cluster_on_off.on = AsyncMock(return_value=[0x01, zcl_f.Status.FAILURE])
# turn on via UI
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert len(group_cluster_on_off.on.mock_calls) == 1
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF
group_cluster_on_off.on = hold_on
# test some of the group logic to make sure we key off states correctly
await send_attributes_report(hass, dev1_cluster_on_off, {0: 1})
await send_attributes_report(hass, dev2_cluster_on_off, {0: 1})
@ -391,7 +446,7 @@ async def test_switch_configurable(
"""Test ZHA configurable switch platform."""
zha_device = await zha_device_joined_restored(zigpy_device_tuya)
cluster = zigpy_device_tuya.endpoints.get(1).tuya_manufacturer
cluster = zigpy_device_tuya.endpoints[1].tuya_manufacturer
entity_id = find_entity_id(Platform.SWITCH, zha_device, hass)
assert entity_id is not None
@ -507,3 +562,125 @@ async def test_switch_configurable(
# test joining a new switch to the network and HA
await async_test_rejoin(hass, zigpy_device_tuya, [cluster], (0,))
WCAttrs = closures.WindowCovering.AttributeDefs
WCT = closures.WindowCovering.WindowCoveringType
WCCS = closures.WindowCovering.ConfigStatus
WCM = closures.WindowCovering.WindowCoveringMode
async def test_cover_inversion_switch(
hass: HomeAssistant, zha_device_joined_restored, zigpy_cover_device
) -> None:
"""Test ZHA cover platform."""
# load up cover domain
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 65,
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
WCAttrs.window_covering_mode.name: WCM(WCM.LEDs_display_feedback),
}
update_attribute_cache(cluster)
zha_device = await zha_device_joined_restored(zigpy_cover_device)
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)
assert cluster.read_attributes.call_count == 3
assert (
WCAttrs.current_position_lift_percentage.name
in cluster.read_attributes.call_args[0][0]
)
assert (
WCAttrs.current_position_tilt_percentage.name
in cluster.read_attributes.call_args[0][0]
)
entity_id = find_entity_id(Platform.SWITCH, zha_device, hass)
assert entity_id is not None
await async_enable_traffic(hass, [zha_device], enabled=False)
# test that the cover was created and that it is unavailable
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_UNAVAILABLE
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, [zha_device])
await hass.async_block_till_done()
# test update
prev_call_count = cluster.read_attributes.call_count
await async_update_entity(hass, entity_id)
assert cluster.read_attributes.call_count == prev_call_count + 1
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF
# test to see the state remains after tilting to 0%
await send_attributes_report(
hass, cluster, {WCAttrs.current_position_tilt_percentage.id: 0}
)
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF
with patch(
"zigpy.zcl.Cluster.write_attributes", return_value=[0x1, zcl_f.Status.SUCCESS]
):
cluster.PLUGGED_ATTR_READS = {
WCAttrs.config_status.name: WCCS.Operational
| WCCS.Open_up_commands_reversed,
}
# turn on from UI
await hass.services.async_call(
SWITCH_DOMAIN, "turn_on", {"entity_id": entity_id}, blocking=True
)
assert cluster.write_attributes.call_count == 1
assert cluster.write_attributes.call_args_list[0] == call(
{
WCAttrs.window_covering_mode.name: WCM.Motor_direction_reversed
| WCM.LEDs_display_feedback
},
manufacturer=None,
)
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_ON
cluster.write_attributes.reset_mock()
# turn off from UI
cluster.PLUGGED_ATTR_READS = {
WCAttrs.config_status.name: WCCS.Operational,
}
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert cluster.write_attributes.call_count == 1
assert cluster.write_attributes.call_args_list[0] == call(
{WCAttrs.window_covering_mode.name: WCM.LEDs_display_feedback},
manufacturer=None,
)
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF
cluster.write_attributes.reset_mock()
# test that sending the command again does not result in a write
await hass.services.async_call(
SWITCH_DOMAIN, "turn_off", {"entity_id": entity_id}, blocking=True
)
assert cluster.write_attributes.call_count == 0
state = hass.states.get(entity_id)
assert state
assert state.state == STATE_OFF