mirror of
https://github.com/home-assistant/core.git
synced 2025-07-28 07:37:34 +00:00
ZHA cover device support (#30639)
* ZHA cover device support * flake8 * flake8, black * isort * pylint * more test * use zigpy provided functions * black * handle command errors, better state handling * black * more test * lint * Update ZHA cover tests coverage. Co-authored-by: Alexei Chetroi <lexoid@gmail.com>
This commit is contained in:
parent
a0b0dc0aca
commit
4d6417295b
@ -62,4 +62,35 @@ class Shade(ZigbeeChannel):
|
|||||||
class WindowCovering(ZigbeeChannel):
|
class WindowCovering(ZigbeeChannel):
|
||||||
"""Window channel."""
|
"""Window channel."""
|
||||||
|
|
||||||
pass
|
_value_attribute = 8
|
||||||
|
REPORT_CONFIG = (
|
||||||
|
{"attr": "current_position_lift_percentage", "config": REPORT_CONFIG_IMMEDIATE},
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_update(self):
|
||||||
|
"""Retrieve latest state."""
|
||||||
|
result = await self.get_attribute_value(
|
||||||
|
"current_position_lift_percentage", from_cache=False
|
||||||
|
)
|
||||||
|
self.debug("read current position: %s", result)
|
||||||
|
|
||||||
|
async_dispatcher_send(
|
||||||
|
self._zha_device.hass, f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", result
|
||||||
|
)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def attribute_updated(self, attrid, value):
|
||||||
|
"""Handle attribute update from window_covering cluster."""
|
||||||
|
attr_name = self.cluster.attributes.get(attrid, [attrid])[0]
|
||||||
|
self.debug(
|
||||||
|
"Attribute report '%s'[%s] = %s", self.cluster.name, attr_name, value
|
||||||
|
)
|
||||||
|
if attrid == self._value_attribute:
|
||||||
|
async_dispatcher_send(
|
||||||
|
self._zha_device.hass, f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", value
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_initialize(self, from_cache):
|
||||||
|
"""Initialize channel."""
|
||||||
|
await self.get_attribute_value(self._value_attribute, from_cache=from_cache)
|
||||||
|
await super().async_initialize(from_cache)
|
||||||
|
@ -3,6 +3,7 @@ import enum
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
||||||
|
from homeassistant.components.cover import DOMAIN as COVER
|
||||||
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
|
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
|
||||||
from homeassistant.components.fan import DOMAIN as FAN
|
from homeassistant.components.fan import DOMAIN as FAN
|
||||||
from homeassistant.components.light import DOMAIN as LIGHT
|
from homeassistant.components.light import DOMAIN as LIGHT
|
||||||
@ -48,6 +49,7 @@ CHANNEL_ACCELEROMETER = "accelerometer"
|
|||||||
CHANNEL_ATTRIBUTE = "attribute"
|
CHANNEL_ATTRIBUTE = "attribute"
|
||||||
CHANNEL_BASIC = "basic"
|
CHANNEL_BASIC = "basic"
|
||||||
CHANNEL_COLOR = "light_color"
|
CHANNEL_COLOR = "light_color"
|
||||||
|
CHANNEL_COVER = "window_covering"
|
||||||
CHANNEL_DOORLOCK = "door_lock"
|
CHANNEL_DOORLOCK = "door_lock"
|
||||||
CHANNEL_ELECTRICAL_MEASUREMENT = "electrical_measurement"
|
CHANNEL_ELECTRICAL_MEASUREMENT = "electrical_measurement"
|
||||||
CHANNEL_EVENT_RELAY = "event_relay"
|
CHANNEL_EVENT_RELAY = "event_relay"
|
||||||
@ -71,7 +73,7 @@ CLUSTER_COMMANDS_SERVER = "server_commands"
|
|||||||
CLUSTER_TYPE_IN = "in"
|
CLUSTER_TYPE_IN = "in"
|
||||||
CLUSTER_TYPE_OUT = "out"
|
CLUSTER_TYPE_OUT = "out"
|
||||||
|
|
||||||
COMPONENTS = (BINARY_SENSOR, DEVICE_TRACKER, FAN, LIGHT, LOCK, SENSOR, SWITCH)
|
COMPONENTS = (BINARY_SENSOR, COVER, DEVICE_TRACKER, FAN, LIGHT, LOCK, SENSOR, SWITCH)
|
||||||
|
|
||||||
CONF_BAUDRATE = "baudrate"
|
CONF_BAUDRATE = "baudrate"
|
||||||
CONF_DATABASE = "database_path"
|
CONF_DATABASE = "database_path"
|
||||||
|
@ -21,6 +21,7 @@ import zigpy_zigate.api
|
|||||||
import zigpy_zigate.zigbee.application
|
import zigpy_zigate.zigbee.application
|
||||||
|
|
||||||
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
from homeassistant.components.binary_sensor import DOMAIN as BINARY_SENSOR
|
||||||
|
from homeassistant.components.cover import DOMAIN as COVER
|
||||||
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
|
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER
|
||||||
from homeassistant.components.fan import DOMAIN as FAN
|
from homeassistant.components.fan import DOMAIN as FAN
|
||||||
from homeassistant.components.light import DOMAIN as LIGHT
|
from homeassistant.components.light import DOMAIN as LIGHT
|
||||||
@ -63,6 +64,7 @@ SINGLE_INPUT_CLUSTER_DEVICE_CLASS = {
|
|||||||
SMARTTHINGS_ACCELERATION_CLUSTER: BINARY_SENSOR,
|
SMARTTHINGS_ACCELERATION_CLUSTER: BINARY_SENSOR,
|
||||||
SMARTTHINGS_HUMIDITY_CLUSTER: SENSOR,
|
SMARTTHINGS_HUMIDITY_CLUSTER: SENSOR,
|
||||||
zcl.clusters.closures.DoorLock: LOCK,
|
zcl.clusters.closures.DoorLock: LOCK,
|
||||||
|
zcl.clusters.closures.WindowCovering: COVER,
|
||||||
zcl.clusters.general.AnalogInput.cluster_id: SENSOR,
|
zcl.clusters.general.AnalogInput.cluster_id: SENSOR,
|
||||||
zcl.clusters.general.MultistateInput.cluster_id: SENSOR,
|
zcl.clusters.general.MultistateInput.cluster_id: SENSOR,
|
||||||
zcl.clusters.general.OnOff: SWITCH,
|
zcl.clusters.general.OnOff: SWITCH,
|
||||||
|
176
homeassistant/components/zha/cover.py
Normal file
176
homeassistant/components/zha/cover.py
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
"""Support for ZHA covers."""
|
||||||
|
from datetime import timedelta
|
||||||
|
import functools
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from zigpy.zcl.foundation import Status
|
||||||
|
|
||||||
|
from homeassistant.components.cover import ATTR_POSITION, DOMAIN, CoverDevice
|
||||||
|
from homeassistant.const import STATE_CLOSED, STATE_CLOSING, STATE_OPEN, STATE_OPENING
|
||||||
|
from homeassistant.core import callback
|
||||||
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||||
|
|
||||||
|
from .core.const import (
|
||||||
|
CHANNEL_COVER,
|
||||||
|
DATA_ZHA,
|
||||||
|
DATA_ZHA_DISPATCHERS,
|
||||||
|
SIGNAL_ATTR_UPDATED,
|
||||||
|
ZHA_DISCOVERY_NEW,
|
||||||
|
)
|
||||||
|
from .core.registries import ZHA_ENTITIES
|
||||||
|
from .entity import ZhaEntity
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
SCAN_INTERVAL = timedelta(minutes=60)
|
||||||
|
STRICT_MATCH = functools.partial(ZHA_ENTITIES.strict_match, DOMAIN)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
|
||||||
|
"""Old way of setting up Zigbee Home Automation covers."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||||
|
"""Set up the Zigbee Home Automation cover from config entry."""
|
||||||
|
|
||||||
|
async def async_discover(discovery_info):
|
||||||
|
await _async_setup_entities(
|
||||||
|
hass, config_entry, async_add_entities, [discovery_info]
|
||||||
|
)
|
||||||
|
|
||||||
|
unsub = async_dispatcher_connect(
|
||||||
|
hass, ZHA_DISCOVERY_NEW.format(DOMAIN), async_discover
|
||||||
|
)
|
||||||
|
hass.data[DATA_ZHA][DATA_ZHA_DISPATCHERS].append(unsub)
|
||||||
|
|
||||||
|
covers = hass.data.get(DATA_ZHA, {}).get(DOMAIN)
|
||||||
|
if covers is not None:
|
||||||
|
await _async_setup_entities(
|
||||||
|
hass, config_entry, async_add_entities, covers.values()
|
||||||
|
)
|
||||||
|
del hass.data[DATA_ZHA][DOMAIN]
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_setup_entities(
|
||||||
|
hass, config_entry, async_add_entities, discovery_infos
|
||||||
|
):
|
||||||
|
"""Set up the ZHA covers."""
|
||||||
|
entities = []
|
||||||
|
for discovery_info in discovery_infos:
|
||||||
|
zha_dev = discovery_info["zha_device"]
|
||||||
|
channels = discovery_info["channels"]
|
||||||
|
|
||||||
|
entity = ZHA_ENTITIES.get_entity(DOMAIN, zha_dev, channels, ZhaCover)
|
||||||
|
if entity:
|
||||||
|
entities.append(entity(**discovery_info))
|
||||||
|
|
||||||
|
if entities:
|
||||||
|
async_add_entities(entities, update_before_add=True)
|
||||||
|
|
||||||
|
|
||||||
|
@STRICT_MATCH(channel_names=CHANNEL_COVER)
|
||||||
|
class ZhaCover(ZhaEntity, CoverDevice):
|
||||||
|
"""Representation of a ZHA cover."""
|
||||||
|
|
||||||
|
def __init__(self, unique_id, zha_device, channels, **kwargs):
|
||||||
|
"""Init this sensor."""
|
||||||
|
super().__init__(unique_id, zha_device, channels, **kwargs)
|
||||||
|
self._cover_channel = self.cluster_channels.get(CHANNEL_COVER)
|
||||||
|
self._current_position = None
|
||||||
|
|
||||||
|
async def async_added_to_hass(self):
|
||||||
|
"""Run when about to be added to hass."""
|
||||||
|
await super().async_added_to_hass()
|
||||||
|
await self.async_accept_signal(
|
||||||
|
self._cover_channel, SIGNAL_ATTR_UPDATED, self.async_set_position
|
||||||
|
)
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def async_restore_last_state(self, last_state):
|
||||||
|
"""Restore previous state."""
|
||||||
|
self._state = last_state.state
|
||||||
|
if "current_position" in last_state.attributes:
|
||||||
|
self._current_position = last_state.attributes["current_position"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_closed(self):
|
||||||
|
"""Return if the cover is closed."""
|
||||||
|
if self.current_cover_position is None:
|
||||||
|
return None
|
||||||
|
return self.current_cover_position == 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_cover_position(self):
|
||||||
|
"""Return the current position of ZHA cover.
|
||||||
|
|
||||||
|
None is unknown, 0 is closed, 100 is fully open.
|
||||||
|
"""
|
||||||
|
return self._current_position
|
||||||
|
|
||||||
|
def async_set_position(self, pos):
|
||||||
|
"""Handle position update from channel."""
|
||||||
|
_LOGGER.debug("setting position: %s", pos)
|
||||||
|
self._current_position = 100 - pos
|
||||||
|
if self._current_position == 0:
|
||||||
|
self._state = STATE_CLOSED
|
||||||
|
elif self._current_position == 100:
|
||||||
|
self._state = STATE_OPEN
|
||||||
|
self.async_schedule_update_ha_state()
|
||||||
|
|
||||||
|
def async_set_state(self, state):
|
||||||
|
"""Handle state update from channel."""
|
||||||
|
_LOGGER.debug("state=%s", state)
|
||||||
|
self._state = state
|
||||||
|
self.async_schedule_update_ha_state()
|
||||||
|
|
||||||
|
async def async_open_cover(self, **kwargs):
|
||||||
|
"""Open the window cover."""
|
||||||
|
res = await self._cover_channel.up_open()
|
||||||
|
if isinstance(res, list) and res[1] is Status.SUCCESS:
|
||||||
|
self.async_set_state(STATE_OPENING)
|
||||||
|
|
||||||
|
async def async_close_cover(self, **kwargs):
|
||||||
|
"""Close the window cover."""
|
||||||
|
res = await self._cover_channel.down_close()
|
||||||
|
if isinstance(res, list) and res[1] is Status.SUCCESS:
|
||||||
|
self.async_set_state(STATE_CLOSING)
|
||||||
|
|
||||||
|
async def async_set_cover_position(self, **kwargs):
|
||||||
|
"""Move the roller shutter to a specific position."""
|
||||||
|
new_pos = kwargs.get(ATTR_POSITION)
|
||||||
|
res = await self._cover_channel.go_to_lift_percentage(100 - new_pos)
|
||||||
|
if isinstance(res, list) and res[1] is Status.SUCCESS:
|
||||||
|
self.async_set_state(
|
||||||
|
STATE_CLOSING if new_pos < self._current_position else STATE_OPENING
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_stop_cover(self, **kwargs):
|
||||||
|
"""Stop the window cover."""
|
||||||
|
res = await self._cover_channel.stop()
|
||||||
|
if isinstance(res, list) and res[1] is Status.SUCCESS:
|
||||||
|
self._state = STATE_OPEN if self._current_position > 0 else STATE_CLOSED
|
||||||
|
self.async_schedule_update_ha_state()
|
||||||
|
|
||||||
|
async def async_update(self):
|
||||||
|
"""Attempt to retrieve the open/close state of the cover."""
|
||||||
|
await super().async_update()
|
||||||
|
await self.async_get_state()
|
||||||
|
|
||||||
|
async def async_get_state(self, from_cache=True):
|
||||||
|
"""Fetch the current state."""
|
||||||
|
_LOGGER.debug("polling current state")
|
||||||
|
if self._cover_channel:
|
||||||
|
pos = await self._cover_channel.get_attribute_value(
|
||||||
|
"current_position_lift_percentage", from_cache=from_cache
|
||||||
|
)
|
||||||
|
_LOGGER.debug("read pos=%s", pos)
|
||||||
|
|
||||||
|
if pos is not None:
|
||||||
|
self._current_position = 100 - pos
|
||||||
|
self._state = (
|
||||||
|
STATE_OPEN if self.current_cover_position > 0 else STATE_CLOSED
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._current_position = None
|
||||||
|
self._state = None
|
129
tests/components/zha/test_cover.py
Normal file
129
tests/components/zha/test_cover.py
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
"""Test zha cover."""
|
||||||
|
from unittest.mock import MagicMock, call, patch
|
||||||
|
|
||||||
|
import zigpy.types
|
||||||
|
import zigpy.zcl.clusters.closures as closures
|
||||||
|
import zigpy.zcl.clusters.general as general
|
||||||
|
import zigpy.zcl.foundation as zcl_f
|
||||||
|
|
||||||
|
from homeassistant.components.cover import DOMAIN
|
||||||
|
from homeassistant.const import STATE_CLOSED, STATE_OPEN, STATE_UNAVAILABLE
|
||||||
|
|
||||||
|
from .common import (
|
||||||
|
async_enable_traffic,
|
||||||
|
async_init_zigpy_device,
|
||||||
|
async_test_device_join,
|
||||||
|
find_entity_id,
|
||||||
|
make_attribute,
|
||||||
|
make_zcl_header,
|
||||||
|
)
|
||||||
|
|
||||||
|
from tests.common import mock_coro
|
||||||
|
|
||||||
|
|
||||||
|
async def test_cover(hass, config_entry, zha_gateway):
|
||||||
|
"""Test zha cover platform."""
|
||||||
|
|
||||||
|
# create zigpy device
|
||||||
|
zigpy_device = await async_init_zigpy_device(
|
||||||
|
hass,
|
||||||
|
[closures.WindowCovering.cluster_id, general.Basic.cluster_id],
|
||||||
|
[],
|
||||||
|
None,
|
||||||
|
zha_gateway,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def get_chan_attr(*args, **kwargs):
|
||||||
|
return 100
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"homeassistant.components.zha.core.channels.ZigbeeChannel.get_attribute_value",
|
||||||
|
new=MagicMock(side_effect=get_chan_attr),
|
||||||
|
) as get_attr_mock:
|
||||||
|
# load up cover domain
|
||||||
|
await hass.config_entries.async_forward_entry_setup(config_entry, DOMAIN)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert get_attr_mock.call_count == 2
|
||||||
|
assert get_attr_mock.call_args[0][0] == "current_position_lift_percentage"
|
||||||
|
|
||||||
|
cluster = zigpy_device.endpoints.get(1).window_covering
|
||||||
|
zha_device = zha_gateway.get_device(zigpy_device.ieee)
|
||||||
|
entity_id = await find_entity_id(DOMAIN, zha_device, hass)
|
||||||
|
assert entity_id is not None
|
||||||
|
|
||||||
|
# test that the cover was created and that it is unavailable
|
||||||
|
assert hass.states.get(entity_id).state == STATE_UNAVAILABLE
|
||||||
|
|
||||||
|
# allow traffic to flow through the gateway and device
|
||||||
|
await async_enable_traffic(hass, zha_gateway, [zha_device])
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
attr = make_attribute(8, 100)
|
||||||
|
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
|
||||||
|
cluster.handle_message(hdr, [[attr]])
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
# test that the state has changed from unavailable to off
|
||||||
|
assert hass.states.get(entity_id).state == STATE_CLOSED
|
||||||
|
|
||||||
|
# test to see if it opens
|
||||||
|
attr = make_attribute(8, 0)
|
||||||
|
hdr = make_zcl_header(zcl_f.Command.Report_Attributes)
|
||||||
|
cluster.handle_message(hdr, [[attr]])
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
assert hass.states.get(entity_id).state == STATE_OPEN
|
||||||
|
|
||||||
|
# close from UI
|
||||||
|
with patch(
|
||||||
|
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x1, zcl_f.Status.SUCCESS])
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN, "close_cover", {"entity_id": entity_id}, blocking=True
|
||||||
|
)
|
||||||
|
assert cluster.request.call_count == 1
|
||||||
|
assert cluster.request.call_args == call(
|
||||||
|
False, 0x1, (), expect_reply=True, manufacturer=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# open from UI
|
||||||
|
with patch(
|
||||||
|
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x0, zcl_f.Status.SUCCESS])
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN, "open_cover", {"entity_id": entity_id}, blocking=True
|
||||||
|
)
|
||||||
|
assert cluster.request.call_count == 1
|
||||||
|
assert cluster.request.call_args == call(
|
||||||
|
False, 0x0, (), expect_reply=True, manufacturer=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# set position UI
|
||||||
|
with patch(
|
||||||
|
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x5, zcl_f.Status.SUCCESS])
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN,
|
||||||
|
"set_cover_position",
|
||||||
|
{"entity_id": entity_id, "position": 47},
|
||||||
|
blocking=True,
|
||||||
|
)
|
||||||
|
assert cluster.request.call_count == 1
|
||||||
|
assert cluster.request.call_args == call(
|
||||||
|
False, 0x5, (zigpy.types.uint8_t,), 53, expect_reply=True, manufacturer=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# stop from UI
|
||||||
|
with patch(
|
||||||
|
"zigpy.zcl.Cluster.request", return_value=mock_coro([0x2, zcl_f.Status.SUCCESS])
|
||||||
|
):
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN, "stop_cover", {"entity_id": entity_id}, blocking=True
|
||||||
|
)
|
||||||
|
assert cluster.request.call_count == 1
|
||||||
|
assert cluster.request.call_args == call(
|
||||||
|
False, 0x2, (), expect_reply=True, manufacturer=None
|
||||||
|
)
|
||||||
|
|
||||||
|
await async_test_device_join(
|
||||||
|
hass, zha_gateway, closures.WindowCovering.cluster_id, entity_id
|
||||||
|
)
|
Loading…
x
Reference in New Issue
Block a user