Clean up light group (#51922)

This commit is contained in:
Franck Nijhof 2021-06-16 14:30:25 +02:00 committed by GitHub
parent 7ad91fdf71
commit 4655e3aa08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from collections import Counter from collections import Counter
from collections.abc import Iterator from collections.abc import Iterator
import itertools import itertools
from typing import Any, Callable, cast from typing import Any, Callable, Set, cast
import voluptuous as vol import voluptuous as vol
@ -42,16 +42,14 @@ from homeassistant.const import (
STATE_ON, STATE_ON,
STATE_UNAVAILABLE, STATE_UNAVAILABLE,
) )
from homeassistant.core import CoreState, HomeAssistant, State from homeassistant.core import CoreState, Event, HomeAssistant, State
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_state_change_event from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from . import GroupEntity from . import GroupEntity
# mypy: allow-incomplete-defs, allow-untyped-calls, allow-untyped-defs
# mypy: no-check-untyped-defs
DEFAULT_NAME = "Light Group" DEFAULT_NAME = "Light Group"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
@ -67,7 +65,10 @@ SUPPORT_GROUP_LIGHT = (
async def async_setup_platform( async def async_setup_platform(
hass: HomeAssistant, config: ConfigType, async_add_entities, discovery_info=None hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Initialize light.group platform.""" """Initialize light.group platform."""
async_add_entities( async_add_entities(
@ -78,33 +79,25 @@ async def async_setup_platform(
class LightGroup(GroupEntity, light.LightEntity): class LightGroup(GroupEntity, light.LightEntity):
"""Representation of a light group.""" """Representation of a light group."""
_attr_available = False
_attr_icon = "mdi:lightbulb-group"
_attr_is_on = False
_attr_max_mireds = 500
_attr_min_mireds = 154
_attr_should_poll = False
def __init__(self, name: str, entity_ids: list[str]) -> None: def __init__(self, name: str, entity_ids: list[str]) -> None:
"""Initialize a light group.""" """Initialize a light group."""
self._name = name
self._entity_ids = entity_ids self._entity_ids = entity_ids
self._is_on = False
self._available = False
self._icon = "mdi:lightbulb-group"
self._brightness: int | None = None
self._color_mode: str | None = None
self._hs_color: tuple[float, float] | None = None
self._rgb_color: tuple[int, int, int] | None = None
self._rgbw_color: tuple[int, int, int, int] | None = None
self._rgbww_color: tuple[int, int, int, int, int] | None = None
self._xy_color: tuple[float, float] | None = None
self._color_temp: int | None = None
self._min_mireds: int = 154
self._max_mireds: int = 500
self._white_value: int | None = None self._white_value: int | None = None
self._effect_list: list[str] | None = None
self._effect: str | None = None self._attr_name = name
self._supported_color_modes: set[str] | None = None self._attr_extra_state_attributes = {ATTR_ENTITY_ID: entity_ids}
self._supported_features: int = 0
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
"""Register callbacks.""" """Register callbacks."""
async def async_state_changed_listener(event): async def async_state_changed_listener(event: Event) -> None:
"""Handle child updates.""" """Handle child updates."""
self.async_set_context(event.context) self.async_set_context(event.context)
await self.async_defer_or_update_ha_state() await self.async_defer_or_update_ha_state()
@ -121,112 +114,12 @@ class LightGroup(GroupEntity, light.LightEntity):
await super().async_added_to_hass() await super().async_added_to_hass()
@property
def name(self) -> str:
"""Return the name of the entity."""
return self._name
@property
def is_on(self) -> bool:
"""Return the on/off state of the light group."""
return self._is_on
@property
def available(self) -> bool:
"""Return whether the light group is available."""
return self._available
@property
def icon(self):
"""Return the light group icon."""
return self._icon
@property
def brightness(self) -> int | None:
"""Return the brightness of this light group between 0..255."""
return self._brightness
@property
def color_mode(self) -> str | None:
"""Return the color mode of the light."""
return self._color_mode
@property
def hs_color(self) -> tuple[float, float] | None:
"""Return the HS color value [float, float]."""
return self._hs_color
@property
def rgb_color(self) -> tuple[int, int, int] | None:
"""Return the rgb color value [int, int, int]."""
return self._rgb_color
@property
def rgbw_color(self) -> tuple[int, int, int, int] | None:
"""Return the rgbw color value [int, int, int, int]."""
return self._rgbw_color
@property
def rgbww_color(self) -> tuple[int, int, int, int, int] | None:
"""Return the rgbww color value [int, int, int, int, int]."""
return self._rgbww_color
@property
def xy_color(self) -> tuple[float, float] | None:
"""Return the xy color value [float, float]."""
return self._xy_color
@property
def color_temp(self) -> int | None:
"""Return the CT color value in mireds."""
return self._color_temp
@property
def min_mireds(self) -> int:
"""Return the coldest color_temp that this light group supports."""
return self._min_mireds
@property
def max_mireds(self) -> int:
"""Return the warmest color_temp that this light group supports."""
return self._max_mireds
@property @property
def white_value(self) -> int | None: def white_value(self) -> int | None:
"""Return the white value of this light group between 0..255.""" """Return the white value of this light group between 0..255."""
return self._white_value return self._white_value
@property async def async_turn_on(self, **kwargs: Any) -> None:
def effect_list(self) -> list[str] | None:
"""Return the list of supported effects."""
return self._effect_list
@property
def effect(self) -> str | None:
"""Return the current effect."""
return self._effect
@property
def supported_color_modes(self) -> set | None:
"""Flag supported color modes."""
return self._supported_color_modes
@property
def supported_features(self) -> int:
"""Flag supported features."""
return self._supported_features
@property
def should_poll(self) -> bool:
"""No polling needed for a light group."""
return False
@property
def extra_state_attributes(self):
"""Return the state attributes for the light group."""
return {ATTR_ENTITY_ID: self._entity_ids}
async def async_turn_on(self, **kwargs):
"""Forward the turn_on command to all lights in the light group.""" """Forward the turn_on command to all lights in the light group."""
data = {ATTR_ENTITY_ID: self._entity_ids} data = {ATTR_ENTITY_ID: self._entity_ids}
@ -271,7 +164,7 @@ class LightGroup(GroupEntity, light.LightEntity):
context=self._context, context=self._context,
) )
async def async_turn_off(self, **kwargs): async def async_turn_off(self, **kwargs: Any) -> None:
"""Forward the turn_off command to all lights in the light group.""" """Forward the turn_off command to all lights in the light group."""
data = {ATTR_ENTITY_ID: self._entity_ids} data = {ATTR_ENTITY_ID: self._entity_ids}
@ -286,57 +179,60 @@ class LightGroup(GroupEntity, light.LightEntity):
context=self._context, context=self._context,
) )
async def async_update(self): async def async_update(self) -> None:
"""Query all members and determine the light group state.""" """Query all members and determine the light group state."""
all_states = [self.hass.states.get(x) for x in self._entity_ids] all_states = [self.hass.states.get(x) for x in self._entity_ids]
states: list[State] = list(filter(None, all_states)) states: list[State] = list(filter(None, all_states))
on_states = [state for state in states if state.state == STATE_ON] on_states = [state for state in states if state.state == STATE_ON]
self._is_on = len(on_states) > 0 self._attr_is_on = len(on_states) > 0
self._available = any(state.state != STATE_UNAVAILABLE for state in states) self._attr_available = any(state.state != STATE_UNAVAILABLE for state in states)
self._attr_brightness = _reduce_attribute(on_states, ATTR_BRIGHTNESS)
self._brightness = _reduce_attribute(on_states, ATTR_BRIGHTNESS) self._attr_hs_color = _reduce_attribute(
on_states, ATTR_HS_COLOR, reduce=_mean_tuple
self._hs_color = _reduce_attribute(on_states, ATTR_HS_COLOR, reduce=_mean_tuple) )
self._rgb_color = _reduce_attribute( self._attr_rgb_color = _reduce_attribute(
on_states, ATTR_RGB_COLOR, reduce=_mean_tuple on_states, ATTR_RGB_COLOR, reduce=_mean_tuple
) )
self._rgbw_color = _reduce_attribute( self._attr_rgbw_color = _reduce_attribute(
on_states, ATTR_RGBW_COLOR, reduce=_mean_tuple on_states, ATTR_RGBW_COLOR, reduce=_mean_tuple
) )
self._rgbww_color = _reduce_attribute( self._attr_rgbww_color = _reduce_attribute(
on_states, ATTR_RGBWW_COLOR, reduce=_mean_tuple on_states, ATTR_RGBWW_COLOR, reduce=_mean_tuple
) )
self._xy_color = _reduce_attribute(on_states, ATTR_XY_COLOR, reduce=_mean_tuple) self._attr_xy_color = _reduce_attribute(
on_states, ATTR_XY_COLOR, reduce=_mean_tuple
)
self._white_value = _reduce_attribute(on_states, ATTR_WHITE_VALUE) self._white_value = _reduce_attribute(on_states, ATTR_WHITE_VALUE)
self._color_temp = _reduce_attribute(on_states, ATTR_COLOR_TEMP) self._attr_color_temp = _reduce_attribute(on_states, ATTR_COLOR_TEMP)
self._min_mireds = _reduce_attribute( self._attr_min_mireds = _reduce_attribute(
states, ATTR_MIN_MIREDS, default=154, reduce=min states, ATTR_MIN_MIREDS, default=154, reduce=min
) )
self._max_mireds = _reduce_attribute( self._attr_max_mireds = _reduce_attribute(
states, ATTR_MAX_MIREDS, default=500, reduce=max states, ATTR_MAX_MIREDS, default=500, reduce=max
) )
self._effect_list = None self._attr_effect_list = None
all_effect_lists = list(_find_state_attributes(states, ATTR_EFFECT_LIST)) all_effect_lists = list(_find_state_attributes(states, ATTR_EFFECT_LIST))
if all_effect_lists: if all_effect_lists:
# Merge all effects from all effect_lists with a union merge. # Merge all effects from all effect_lists with a union merge.
self._effect_list = list(set().union(*all_effect_lists)) self._attr_effect_list = list(set().union(*all_effect_lists))
self._effect_list.sort() self._attr_effect_list.sort()
if "None" in self._effect_list: if "None" in self._attr_effect_list:
self._effect_list.remove("None") self._attr_effect_list.remove("None")
self._effect_list.insert(0, "None") self._attr_effect_list.insert(0, "None")
self._effect = None self._attr_effect = None
all_effects = list(_find_state_attributes(on_states, ATTR_EFFECT)) all_effects = list(_find_state_attributes(on_states, ATTR_EFFECT))
if all_effects: if all_effects:
# Report the most common effect. # Report the most common effect.
effects_count = Counter(itertools.chain(all_effects)) effects_count = Counter(itertools.chain(all_effects))
self._effect = effects_count.most_common(1)[0][0] self._attr_effect = effects_count.most_common(1)[0][0]
self._color_mode = None self._attr_color_mode = None
all_color_modes = list(_find_state_attributes(on_states, ATTR_COLOR_MODE)) all_color_modes = list(_find_state_attributes(on_states, ATTR_COLOR_MODE))
if all_color_modes: if all_color_modes:
# Report the most common color mode, select brightness and onoff last # Report the most common color mode, select brightness and onoff last
@ -345,24 +241,26 @@ class LightGroup(GroupEntity, light.LightEntity):
color_mode_count[COLOR_MODE_ONOFF] = -1 color_mode_count[COLOR_MODE_ONOFF] = -1
if COLOR_MODE_BRIGHTNESS in color_mode_count: if COLOR_MODE_BRIGHTNESS in color_mode_count:
color_mode_count[COLOR_MODE_BRIGHTNESS] = 0 color_mode_count[COLOR_MODE_BRIGHTNESS] = 0
self._color_mode = color_mode_count.most_common(1)[0][0] self._attr_color_mode = color_mode_count.most_common(1)[0][0]
self._supported_color_modes = None self._attr_supported_color_modes = None
all_supported_color_modes = list( all_supported_color_modes = list(
_find_state_attributes(states, ATTR_SUPPORTED_COLOR_MODES) _find_state_attributes(states, ATTR_SUPPORTED_COLOR_MODES)
) )
if all_supported_color_modes: if all_supported_color_modes:
# Merge all color modes. # Merge all color modes.
self._supported_color_modes = set().union(*all_supported_color_modes) self._attr_supported_color_modes = cast(
Set[str], set().union(*all_supported_color_modes)
)
self._supported_features = 0 self._attr_supported_features = 0
for support in _find_state_attributes(states, ATTR_SUPPORTED_FEATURES): for support in _find_state_attributes(states, ATTR_SUPPORTED_FEATURES):
# Merge supported features by emulating support for every feature # Merge supported features by emulating support for every feature
# we find. # we find.
self._supported_features |= support self._attr_supported_features |= support
# Bitwise-and the supported features with the GroupedLight's features # Bitwise-and the supported features with the GroupedLight's features
# so that we don't break in the future when a new feature is added. # so that we don't break in the future when a new feature is added.
self._supported_features &= SUPPORT_GROUP_LIGHT self._attr_supported_features &= SUPPORT_GROUP_LIGHT
def _find_state_attributes(states: list[State], key: str) -> Iterator[Any]: def _find_state_attributes(states: list[State], key: str) -> Iterator[Any]:
@ -373,12 +271,12 @@ def _find_state_attributes(states: list[State], key: str) -> Iterator[Any]:
yield value yield value
def _mean_int(*args): def _mean_int(*args: Any) -> int:
"""Return the mean of the supplied values.""" """Return the mean of the supplied values."""
return int(sum(args) / len(args)) return int(sum(args) / len(args))
def _mean_tuple(*args): def _mean_tuple(*args: Any) -> tuple[float | Any, ...]:
"""Return the mean values along the columns of the supplied values.""" """Return the mean values along the columns of the supplied values."""
return tuple(sum(x) / len(x) for x in zip(*args)) return tuple(sum(x) / len(x) for x in zip(*args))