mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 05:07:41 +00:00
Update ZHA switch entities to leverage Zigpy cache appropriately (#71062)
This commit is contained in:
parent
682ac52a20
commit
755020ff63
@ -6,6 +6,7 @@ from collections.abc import Coroutine
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import zigpy.exceptions
|
import zigpy.exceptions
|
||||||
|
import zigpy.types as t
|
||||||
from zigpy.zcl.clusters import general
|
from zigpy.zcl.clusters import general
|
||||||
from zigpy.zcl.foundation import Status
|
from zigpy.zcl.foundation import Status
|
||||||
|
|
||||||
@ -300,7 +301,6 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize OnOffChannel."""
|
"""Initialize OnOffChannel."""
|
||||||
super().__init__(cluster, ch_pool)
|
super().__init__(cluster, ch_pool)
|
||||||
self._state = None
|
|
||||||
self._off_listener = None
|
self._off_listener = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -314,9 +314,9 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
cmd = parse_and_log_command(self, tsn, command_id, args)
|
cmd = parse_and_log_command(self, tsn, command_id, args)
|
||||||
|
|
||||||
if cmd in ("off", "off_with_effect"):
|
if cmd in ("off", "off_with_effect"):
|
||||||
self.attribute_updated(self.ON_OFF, False)
|
self.cluster.update_attribute(self.ON_OFF, t.Bool.false)
|
||||||
elif cmd in ("on", "on_with_recall_global_scene"):
|
elif cmd in ("on", "on_with_recall_global_scene"):
|
||||||
self.attribute_updated(self.ON_OFF, True)
|
self.cluster.update_attribute(self.ON_OFF, t.Bool.true)
|
||||||
elif cmd == "on_with_timed_off":
|
elif cmd == "on_with_timed_off":
|
||||||
should_accept = args[0]
|
should_accept = args[0]
|
||||||
on_time = args[1]
|
on_time = args[1]
|
||||||
@ -325,7 +325,7 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
if self._off_listener is not None:
|
if self._off_listener is not None:
|
||||||
self._off_listener()
|
self._off_listener()
|
||||||
self._off_listener = None
|
self._off_listener = None
|
||||||
self.attribute_updated(self.ON_OFF, True)
|
self.cluster.update_attribute(self.ON_OFF, t.Bool.true)
|
||||||
if on_time > 0:
|
if on_time > 0:
|
||||||
self._off_listener = async_call_later(
|
self._off_listener = async_call_later(
|
||||||
self._ch_pool.hass,
|
self._ch_pool.hass,
|
||||||
@ -333,13 +333,13 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
self.set_to_off,
|
self.set_to_off,
|
||||||
)
|
)
|
||||||
elif cmd == "toggle":
|
elif cmd == "toggle":
|
||||||
self.attribute_updated(self.ON_OFF, not bool(self._state))
|
self.cluster.update_attribute(self.ON_OFF, not bool(self.on_off))
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def set_to_off(self, *_):
|
def set_to_off(self, *_):
|
||||||
"""Set the state to off."""
|
"""Set the state to off."""
|
||||||
self._off_listener = None
|
self._off_listener = None
|
||||||
self.attribute_updated(self.ON_OFF, False)
|
self.cluster.update_attribute(self.ON_OFF, t.Bool.false)
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def attribute_updated(self, attrid, value):
|
def attribute_updated(self, attrid, value):
|
||||||
@ -348,11 +348,6 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
self.async_send_signal(
|
self.async_send_signal(
|
||||||
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, "on_off", value
|
f"{self.unique_id}_{SIGNAL_ATTR_UPDATED}", attrid, "on_off", value
|
||||||
)
|
)
|
||||||
self._state = bool(value)
|
|
||||||
|
|
||||||
async def async_initialize_channel_specific(self, from_cache: bool) -> None:
|
|
||||||
"""Initialize channel."""
|
|
||||||
self._state = self.on_off
|
|
||||||
|
|
||||||
async def async_update(self):
|
async def async_update(self):
|
||||||
"""Initialize channel."""
|
"""Initialize channel."""
|
||||||
@ -360,9 +355,7 @@ class OnOffChannel(ZigbeeChannel):
|
|||||||
return
|
return
|
||||||
from_cache = not self._ch_pool.is_mains_powered
|
from_cache = not self._ch_pool.is_mains_powered
|
||||||
self.debug("attempting to update onoff state - from cache: %s", from_cache)
|
self.debug("attempting to update onoff state - from cache: %s", from_cache)
|
||||||
state = await self.get_attribute_value(self.ON_OFF, from_cache=from_cache)
|
await self.get_attribute_value(self.ON_OFF, from_cache=from_cache)
|
||||||
if state is not None:
|
|
||||||
self._state = bool(state)
|
|
||||||
await super().async_update()
|
await super().async_update()
|
||||||
|
|
||||||
|
|
||||||
|
@ -46,21 +46,73 @@ async def async_setup_entry(
|
|||||||
config_entry.async_on_unload(unsub)
|
config_entry.async_on_unload(unsub)
|
||||||
|
|
||||||
|
|
||||||
class BaseSwitch(SwitchEntity):
|
@STRICT_MATCH(channel_names=CHANNEL_ON_OFF)
|
||||||
"""Common base class for zha switches."""
|
class Switch(ZhaEntity, SwitchEntity):
|
||||||
|
"""ZHA switch."""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, unique_id, zha_device, channels, **kwargs):
|
||||||
"""Initialize the ZHA switch."""
|
"""Initialize the ZHA switch."""
|
||||||
self._on_off_channel = None
|
super().__init__(unique_id, zha_device, channels, **kwargs)
|
||||||
self._state = None
|
self._on_off_channel = self.cluster_channels.get(CHANNEL_ON_OFF)
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_on(self) -> bool:
|
def is_on(self) -> bool:
|
||||||
"""Return if the switch is on based on the statemachine."""
|
"""Return if the switch is on based on the statemachine."""
|
||||||
if self._state is None:
|
if self._on_off_channel.on_off is None:
|
||||||
return False
|
return False
|
||||||
return self._state
|
return self._on_off_channel.on_off
|
||||||
|
|
||||||
|
async def async_turn_on(self, **kwargs) -> None:
|
||||||
|
"""Turn the entity on."""
|
||||||
|
result = await self._on_off_channel.on()
|
||||||
|
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
|
||||||
|
return
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
async def async_turn_off(self, **kwargs) -> None:
|
||||||
|
"""Turn the entity off."""
|
||||||
|
result = await self._on_off_channel.off()
|
||||||
|
if isinstance(result, Exception) or result[1] is not Status.SUCCESS:
|
||||||
|
return
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def async_set_state(self, attr_id: int, attr_name: str, value: Any):
|
||||||
|
"""Handle state update from channel."""
|
||||||
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
async def async_added_to_hass(self) -> None:
|
||||||
|
"""Run when about to be added to hass."""
|
||||||
|
await super().async_added_to_hass()
|
||||||
|
self.async_accept_signal(
|
||||||
|
self._on_off_channel, SIGNAL_ATTR_UPDATED, self.async_set_state
|
||||||
|
)
|
||||||
|
|
||||||
|
async def async_update(self) -> None:
|
||||||
|
"""Attempt to retrieve on off state from the switch."""
|
||||||
|
await super().async_update()
|
||||||
|
if self._on_off_channel:
|
||||||
|
await self._on_off_channel.get_attribute_value("on_off", from_cache=False)
|
||||||
|
|
||||||
|
|
||||||
|
@GROUP_MATCH()
|
||||||
|
class SwitchGroup(ZhaGroupEntity, SwitchEntity):
|
||||||
|
"""Representation of a switch group."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, entity_ids: list[str], unique_id: str, group_id: int, zha_device, **kwargs
|
||||||
|
) -> None:
|
||||||
|
"""Initialize a switch group."""
|
||||||
|
super().__init__(entity_ids, unique_id, group_id, zha_device, **kwargs)
|
||||||
|
self._available: bool
|
||||||
|
self._state: bool
|
||||||
|
group = self.zha_device.gateway.get_group(self._group_id)
|
||||||
|
self._on_off_channel = group.endpoint[OnOff.cluster_id]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_on(self) -> bool:
|
||||||
|
"""Return if the switch is on based on the statemachine."""
|
||||||
|
return bool(self._state)
|
||||||
|
|
||||||
async def async_turn_on(self, **kwargs) -> None:
|
async def async_turn_on(self, **kwargs) -> None:
|
||||||
"""Turn the entity on."""
|
"""Turn the entity on."""
|
||||||
@ -78,56 +130,6 @@ class BaseSwitch(SwitchEntity):
|
|||||||
self._state = False
|
self._state = False
|
||||||
self.async_write_ha_state()
|
self.async_write_ha_state()
|
||||||
|
|
||||||
|
|
||||||
@STRICT_MATCH(channel_names=CHANNEL_ON_OFF)
|
|
||||||
class Switch(BaseSwitch, ZhaEntity):
|
|
||||||
"""ZHA switch."""
|
|
||||||
|
|
||||||
def __init__(self, unique_id, zha_device, channels, **kwargs):
|
|
||||||
"""Initialize the ZHA switch."""
|
|
||||||
super().__init__(unique_id, zha_device, channels, **kwargs)
|
|
||||||
self._on_off_channel = self.cluster_channels.get(CHANNEL_ON_OFF)
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def async_set_state(self, attr_id: int, attr_name: str, value: Any):
|
|
||||||
"""Handle state update from channel."""
|
|
||||||
self._state = bool(value)
|
|
||||||
self.async_write_ha_state()
|
|
||||||
|
|
||||||
async def async_added_to_hass(self) -> None:
|
|
||||||
"""Run when about to be added to hass."""
|
|
||||||
await super().async_added_to_hass()
|
|
||||||
self.async_accept_signal(
|
|
||||||
self._on_off_channel, SIGNAL_ATTR_UPDATED, self.async_set_state
|
|
||||||
)
|
|
||||||
|
|
||||||
@callback
|
|
||||||
def async_restore_last_state(self, last_state) -> None:
|
|
||||||
"""Restore previous state."""
|
|
||||||
self._state = last_state.state == STATE_ON
|
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
|
||||||
"""Attempt to retrieve on off state from the switch."""
|
|
||||||
await super().async_update()
|
|
||||||
if self._on_off_channel:
|
|
||||||
state = await self._on_off_channel.get_attribute_value("on_off")
|
|
||||||
if state is not None:
|
|
||||||
self._state = state
|
|
||||||
|
|
||||||
|
|
||||||
@GROUP_MATCH()
|
|
||||||
class SwitchGroup(BaseSwitch, ZhaGroupEntity):
|
|
||||||
"""Representation of a switch group."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self, entity_ids: list[str], unique_id: str, group_id: int, zha_device, **kwargs
|
|
||||||
) -> None:
|
|
||||||
"""Initialize a switch group."""
|
|
||||||
super().__init__(entity_ids, unique_id, group_id, zha_device, **kwargs)
|
|
||||||
self._available: bool = False
|
|
||||||
group = self.zha_device.gateway.get_group(self._group_id)
|
|
||||||
self._on_off_channel = group.endpoint[OnOff.cluster_id]
|
|
||||||
|
|
||||||
async def async_update(self) -> None:
|
async def async_update(self) -> None:
|
||||||
"""Query all members and determine the light group state."""
|
"""Query all members and determine the light group state."""
|
||||||
all_states = [self.hass.states.get(x) for x in self._entity_ids]
|
all_states = [self.hass.states.get(x) for x in self._entity_ids]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user