Refactor ZHA light initialization (#43149)

* Refactor ZHA light initialization

* Don't do redundant attribute reads
This commit is contained in:
Alexei Chetroi 2020-11-14 17:24:41 -05:00 committed by GitHub
parent cd42d82f9d
commit 84569549f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 66 deletions

View File

@ -3,7 +3,7 @@ from typing import Optional
import zigpy.zcl.clusters.lighting as lighting import zigpy.zcl.clusters.lighting as lighting
from .. import registries, typing as zha_typing from .. import registries
from ..const import REPORT_CONFIG_DEFAULT from ..const import REPORT_CONFIG_DEFAULT
from .base import ClientChannel, ZigbeeChannel from .base import ClientChannel, ZigbeeChannel
@ -32,15 +32,19 @@ class ColorChannel(ZigbeeChannel):
{"attr": "current_y", "config": REPORT_CONFIG_DEFAULT}, {"attr": "current_y", "config": REPORT_CONFIG_DEFAULT},
{"attr": "color_temperature", "config": REPORT_CONFIG_DEFAULT}, {"attr": "color_temperature", "config": REPORT_CONFIG_DEFAULT},
) )
MAX_MIREDS: int = 500
MIN_MIREDS: int = 153
def __init__( @property
self, cluster: zha_typing.ZigpyClusterType, ch_pool: zha_typing.ChannelPoolType def color_capabilities(self) -> int:
) -> None: """Return color capabilities of the light."""
"""Initialize ColorChannel.""" try:
super().__init__(cluster, ch_pool) return self.cluster["color_capabilities"]
self._color_capabilities = None except KeyError:
self._min_mireds = 153 pass
self._max_mireds = 500 if self.cluster.get("color_temperature") is not None:
return self.CAPABILITIES_COLOR_XY | self.CAPABILITIES_COLOR_TEMP
return self.CAPABILITIES_COLOR_XY
@property @property
def color_loop_active(self) -> Optional[int]: def color_loop_active(self) -> Optional[int]:
@ -65,49 +69,30 @@ class ColorChannel(ZigbeeChannel):
@property @property
def min_mireds(self) -> int: def min_mireds(self) -> int:
"""Return the coldest color_temp that this channel supports.""" """Return the coldest color_temp that this channel supports."""
return self.cluster.get("color_temp_physical_min", self._min_mireds) return self.cluster.get("color_temp_physical_min", self.MIN_MIREDS)
@property @property
def max_mireds(self) -> int: def max_mireds(self) -> int:
"""Return the warmest color_temp that this channel supports.""" """Return the warmest color_temp that this channel supports."""
return self.cluster.get("color_temp_physical_max", self._max_mireds) return self.cluster.get("color_temp_physical_max", self.MAX_MIREDS)
def get_color_capabilities(self): async def async_configure(self) -> None:
"""Return the color capabilities."""
return self._color_capabilities
async def async_configure(self):
"""Configure channel.""" """Configure channel."""
await self.fetch_color_capabilities(False) await self.fetch_color_capabilities(False)
await super().async_configure() await super().async_configure()
async def async_initialize(self, from_cache): async def async_initialize(self, from_cache: bool) -> None:
"""Initialize channel.""" """Initialize channel."""
await self.fetch_color_capabilities(True) await self.fetch_color_capabilities(True)
attributes = ["color_temperature", "current_x", "current_y"] await super().async_initialize(from_cache)
await self.get_attributes(attributes, from_cache=from_cache)
async def fetch_color_capabilities(self, from_cache): async def fetch_color_capabilities(self, from_cache: bool) -> None:
"""Get the color configuration.""" """Get the color configuration."""
attributes = [ attributes = [
"color_temp_physical_min", "color_temp_physical_min",
"color_temp_physical_max", "color_temp_physical_max",
"color_capabilities", "color_capabilities",
"color_temperature",
] ]
results = await self.get_attributes(attributes, from_cache=from_cache) # just populates the cache, if not already done
capabilities = results.get("color_capabilities") await self.get_attributes(attributes, from_cache=from_cache)
if capabilities is None:
# ZCL Version 4 devices don't support the color_capabilities
# attribute. In this version XY support is mandatory, but we
# need to probe to determine if the device supports color
# temperature.
capabilities = self.CAPABILITIES_COLOR_XY
result = await self.get_attribute_value(
"color_temperature", from_cache=from_cache
)
if result is not None and result is not self.UNSUPPORTED_ATTRIBUTE:
capabilities |= self.CAPABILITIES_COLOR_TEMP
self._color_capabilities = capabilities
await super().async_initialize(from_cache)

View File

@ -344,7 +344,7 @@ class Light(BaseLight, ZhaEntity):
self._brightness = self._level_channel.current_level self._brightness = self._level_channel.current_level
if self._color_channel: if self._color_channel:
color_capabilities = self._color_channel.get_color_capabilities() color_capabilities = self._color_channel.color_capabilities
if color_capabilities & CAPABILITIES_COLOR_TEMP: if color_capabilities & CAPABILITIES_COLOR_TEMP:
self._supported_features |= light.SUPPORT_COLOR_TEMP self._supported_features |= light.SUPPORT_COLOR_TEMP
self._color_temp = self._color_channel.color_temperature self._color_temp = self._color_channel.color_temperature
@ -439,34 +439,20 @@ class Light(BaseLight, ZhaEntity):
if level is not None: if level is not None:
self._brightness = level self._brightness = level
if self._color_channel: if self._color_channel:
attributes = [] attributes = [
color_capabilities = self._color_channel.get_color_capabilities() "color_temperature",
if ( "current_x",
color_capabilities is not None "current_y",
and color_capabilities & CAPABILITIES_COLOR_TEMP "color_loop_active",
): ]
attributes.append("color_temperature")
if (
color_capabilities is not None
and color_capabilities & CAPABILITIES_COLOR_XY
):
attributes.append("current_x")
attributes.append("current_y")
if (
color_capabilities is not None
and color_capabilities & CAPABILITIES_COLOR_LOOP
):
attributes.append("color_loop_active")
results = await self._color_channel.get_attributes( results = await self._color_channel.get_attributes(
attributes, from_cache=from_cache attributes, from_cache=from_cache
) )
if ( color_temp = results.get("color_temperature")
"color_temperature" in results if color_temp is not None:
and results["color_temperature"] is not None self._color_temp = color_temp
):
self._color_temp = results["color_temperature"]
color_x = results.get("current_x") color_x = results.get("current_x")
color_y = results.get("current_y") color_y = results.get("current_y")
@ -474,13 +460,13 @@ class Light(BaseLight, ZhaEntity):
self._hs_color = color_util.color_xy_to_hs( self._hs_color = color_util.color_xy_to_hs(
float(color_x / 65535), float(color_y / 65535) float(color_x / 65535), float(color_y / 65535)
) )
if (
"color_loop_active" in results color_loop_active = results.get("color_loop_active")
and results["color_loop_active"] is not None if color_loop_active is not None:
):
color_loop_active = results["color_loop_active"]
if color_loop_active == 1: if color_loop_active == 1:
self._effect = light.EFFECT_COLORLOOP self._effect = light.EFFECT_COLORLOOP
else:
self._effect = None
async def _refresh(self, time): async def _refresh(self, time):
"""Call async_get_state at an interval.""" """Call async_get_state at an interval."""