Add coordinator to Twinkly (#133793)

This commit is contained in:
Joost Lekkerkerker 2024-12-23 11:35:37 +01:00 committed by GitHub
parent e3cf5c47b2
commit 939365887f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 222 additions and 207 deletions

View File

@ -1,8 +1,6 @@
"""The twinkly component."""
from dataclasses import dataclass
import logging
from typing import Any
from aiohttp import ClientError
from ttls.client import Twinkly
@ -10,27 +8,18 @@ from ttls.client import Twinkly
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import ATTR_VERSION, DOMAIN
from .const import DOMAIN
from .coordinator import TwinklyCoordinator
PLATFORMS = [Platform.LIGHT]
_LOGGER = logging.getLogger(__name__)
@dataclass
class TwinklyData:
"""Data for Twinkly integration."""
client: Twinkly
device_info: dict[str, Any]
sw_version: str | None
type TwinklyConfigEntry = ConfigEntry[TwinklyData]
type TwinklyConfigEntry = ConfigEntry[TwinklyCoordinator]
async def async_setup_entry(hass: HomeAssistant, entry: TwinklyConfigEntry) -> bool:
@ -41,15 +30,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: TwinklyConfigEntry) -> b
client = Twinkly(host, async_get_clientsession(hass))
try:
device_info = await client.get_details()
software_version = await client.get_firmware_version()
except (TimeoutError, ClientError) as exception:
raise ConfigEntryNotReady from exception
coordinator = TwinklyCoordinator(hass, client)
entry.runtime_data = TwinklyData(
client, device_info, software_version.get(ATTR_VERSION)
)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

View File

@ -0,0 +1,102 @@
"""Coordinator for Twinkly."""
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from aiohttp import ClientError
from awesomeversion import AwesomeVersion
from ttls.client import Twinkly, TwinklyError
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DEV_NAME, DOMAIN, MIN_EFFECT_VERSION
_LOGGER = logging.getLogger(__name__)
@dataclass
class TwinklyData:
"""Class for Twinkly data."""
device_info: dict[str, Any]
brightness: int
is_on: bool
movies: dict[int, str]
current_movie: int | None
class TwinklyCoordinator(DataUpdateCoordinator[TwinklyData]):
"""Class to manage fetching Twinkly data from API."""
software_version: str
supports_effects: bool
device_name: str
def __init__(self, hass: HomeAssistant, client: Twinkly) -> None:
"""Initialize global Twinkly data updater."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=timedelta(seconds=30),
)
self.client = client
async def _async_setup(self) -> None:
"""Set up the Twinkly data."""
try:
software_version = await self.client.get_firmware_version()
self.device_name = (await self.client.get_details())[DEV_NAME]
except (TimeoutError, ClientError) as exception:
raise UpdateFailed from exception
self.software_version = software_version["version"]
self.supports_effects = AwesomeVersion(self.software_version) >= AwesomeVersion(
MIN_EFFECT_VERSION
)
async def _async_update_data(self) -> TwinklyData:
"""Fetch data from Twinkly."""
movies: list[dict[str, Any]] = []
current_movie: dict[str, Any] = {}
try:
device_info = await self.client.get_details()
brightness = await self.client.get_brightness()
is_on = await self.client.is_on()
if self.supports_effects:
movies = (await self.client.get_saved_movies())["movies"]
except (TimeoutError, ClientError) as exception:
raise UpdateFailed from exception
if self.supports_effects:
try:
current_movie = await self.client.get_current_movie()
except (TwinklyError, TimeoutError, ClientError) as exception:
_LOGGER.debug("Error fetching current movie: %s", exception)
brightness = (
int(brightness["value"]) if brightness["mode"] == "enabled" else 100
)
brightness = int(round(brightness * 2.55)) if is_on else 0
if self.device_name != device_info[DEV_NAME]:
self._async_update_device_info(device_info[DEV_NAME])
return TwinklyData(
device_info,
brightness,
is_on,
{movie["id"]: movie["name"] for movie in movies},
current_movie.get("id"),
)
def _async_update_device_info(self, name: str) -> None:
"""Update the device info."""
device_registry = dr.async_get(self.hass)
device = device_registry.async_get_device(
identifiers={(DOMAIN, self.data.device_info["mac"])},
)
if device:
device_registry.async_update_device(
device.id,
name=name,
)

View File

@ -34,8 +34,8 @@ async def async_get_config_entry_diagnostics(
return async_redact_data(
{
"entry": entry.as_dict(),
"device_info": entry.runtime_data.device_info,
ATTR_SW_VERSION: entry.runtime_data.sw_version,
"device_info": entry.runtime_data.data.device_info,
ATTR_SW_VERSION: entry.runtime_data.software_version,
"attributes": attributes,
},
TO_REDACT,

View File

@ -5,9 +5,6 @@ from __future__ import annotations
import logging
from typing import Any
from aiohttp import ClientError
from awesomeversion import AwesomeVersion
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_EFFECT,
@ -17,13 +14,12 @@ from homeassistant.components.light import (
LightEntity,
LightEntityFeature,
)
from homeassistant.const import CONF_HOST, CONF_ID, CONF_MODEL, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import TwinklyConfigEntry
from . import TwinklyConfigEntry, TwinklyCoordinator
from .const import (
DEV_LED_PROFILE,
DEV_MODEL,
@ -31,7 +27,6 @@ from .const import (
DEV_PROFILE_RGB,
DEV_PROFILE_RGBW,
DOMAIN,
MIN_EFFECT_VERSION,
)
_LOGGER = logging.getLogger(__name__)
@ -43,26 +38,23 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Setups an entity from a config entry (UI config flow)."""
entity = TwinklyLight(config_entry)
entity = TwinklyLight(config_entry.runtime_data)
async_add_entities([entity], update_before_add=True)
class TwinklyLight(LightEntity):
class TwinklyLight(CoordinatorEntity[TwinklyCoordinator], LightEntity):
"""Implementation of the light for the Twinkly service."""
_attr_has_entity_name = True
_attr_name = None
_attr_translation_key = "light"
def __init__(
self,
entry: TwinklyConfigEntry,
) -> None:
def __init__(self, coordinator: TwinklyCoordinator) -> None:
"""Initialize a TwinklyLight entity."""
device_info = entry.runtime_data.device_info
self._attr_unique_id: str = device_info["mac"]
self._conf = entry
super().__init__(coordinator)
device_info = coordinator.data.device_info
self._attr_unique_id = mac = device_info["mac"]
if device_info.get(DEV_LED_PROFILE) == DEV_PROFILE_RGBW:
self._attr_supported_color_modes = {ColorMode.RGBW}
@ -75,66 +67,35 @@ class TwinklyLight(LightEntity):
else:
self._attr_supported_color_modes = {ColorMode.BRIGHTNESS}
self._attr_color_mode = ColorMode.BRIGHTNESS
# Those are saved in the config entry in order to have meaningful values even
# if the device is currently offline.
# They are expected to be updated using the device_info.
self._name = entry.data[CONF_NAME] or "Twinkly light"
self._model = entry.data[CONF_MODEL]
self._mac = device_info["mac"]
self._client = entry.runtime_data.client
# Set default state before any update
self._attr_is_on = False
self._attr_available = False
self._current_movie: dict[Any, Any] = {}
self._movies: list[Any] = []
self._software_version = entry.runtime_data.sw_version
# We guess that most devices are "new" and support effects
self._attr_supported_features = LightEntityFeature.EFFECT
@property
def device_info(self) -> DeviceInfo | None:
"""Get device specific attributes."""
return DeviceInfo(
identifiers={(DOMAIN, self._mac)},
connections={(CONNECTION_NETWORK_MAC, self._mac)},
self.client = coordinator.client
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, mac)},
connections={(CONNECTION_NETWORK_MAC, mac)},
manufacturer="LEDWORKS",
model=self._model,
name=self._name,
sw_version=self._software_version,
model=device_info[DEV_MODEL],
name=device_info[DEV_NAME],
sw_version=coordinator.software_version,
)
if coordinator.supports_effects:
self._attr_supported_features = LightEntityFeature.EFFECT
self._update_attr()
@property
def effect(self) -> str | None:
"""Return the current effect."""
if "name" in self._current_movie:
return f"{self._current_movie['id']} {self._current_movie['name']}"
if (current_movie_id := self.coordinator.data.current_movie) is not None:
return (
f"{current_movie_id} {self.coordinator.data.movies[current_movie_id]}"
)
return None
@property
def effect_list(self) -> list[str]:
"""Return the list of saved effects."""
return [f"{movie['id']} {movie['name']}" for movie in self._movies]
async def async_added_to_hass(self) -> None:
"""Device is added to hass."""
if self._software_version:
if AwesomeVersion(self._software_version) < AwesomeVersion(
MIN_EFFECT_VERSION
):
self._attr_supported_features = (
self.supported_features & ~LightEntityFeature.EFFECT
)
device_registry = dr.async_get(self.hass)
device_entry = device_registry.async_get_device(
{(DOMAIN, self._attr_unique_id)}, set()
)
if device_entry:
device_registry.async_update_device(
device_entry.id, sw_version=self._software_version
)
return [
f"{identifier} {name}"
for identifier, name in self.coordinator.data.movies.items()
]
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn device on."""
@ -144,29 +105,29 @@ class TwinklyLight(LightEntity):
# If brightness is 0, the twinkly will only "disable" the brightness,
# which means that it will be 100%.
if brightness == 0:
await self._client.turn_off()
await self.client.turn_off()
return
await self._client.set_brightness(brightness)
await self.client.set_brightness(brightness)
if (
ATTR_RGBW_COLOR in kwargs
and kwargs[ATTR_RGBW_COLOR] != self._attr_rgbw_color
):
await self._client.interview()
await self.client.interview()
if LightEntityFeature.EFFECT & self.supported_features:
# Static color only supports rgb
await self._client.set_static_colour(
await self.client.set_static_colour(
(
kwargs[ATTR_RGBW_COLOR][0],
kwargs[ATTR_RGBW_COLOR][1],
kwargs[ATTR_RGBW_COLOR][2],
)
)
await self._client.set_mode("color")
self._client.default_mode = "color"
await self.client.set_mode("color")
self.client.default_mode = "color"
else:
await self._client.set_cycle_colours(
await self.client.set_cycle_colours(
(
kwargs[ATTR_RGBW_COLOR][3],
kwargs[ATTR_RGBW_COLOR][0],
@ -174,20 +135,20 @@ class TwinklyLight(LightEntity):
kwargs[ATTR_RGBW_COLOR][2],
)
)
await self._client.set_mode("movie")
self._client.default_mode = "movie"
await self.client.set_mode("movie")
self.client.default_mode = "movie"
self._attr_rgbw_color = kwargs[ATTR_RGBW_COLOR]
if ATTR_RGB_COLOR in kwargs and kwargs[ATTR_RGB_COLOR] != self._attr_rgb_color:
await self._client.interview()
await self.client.interview()
if LightEntityFeature.EFFECT & self.supported_features:
await self._client.set_static_colour(kwargs[ATTR_RGB_COLOR])
await self._client.set_mode("color")
self._client.default_mode = "color"
await self.client.set_static_colour(kwargs[ATTR_RGB_COLOR])
await self.client.set_mode("color")
self.client.default_mode = "color"
else:
await self._client.set_cycle_colours(kwargs[ATTR_RGB_COLOR])
await self._client.set_mode("movie")
self._client.default_mode = "movie"
await self.client.set_cycle_colours(kwargs[ATTR_RGB_COLOR])
await self.client.set_mode("movie")
self.client.default_mode = "movie"
self._attr_rgb_color = kwargs[ATTR_RGB_COLOR]
@ -196,100 +157,29 @@ class TwinklyLight(LightEntity):
and LightEntityFeature.EFFECT & self.supported_features
):
movie_id = kwargs[ATTR_EFFECT].split(" ")[0]
if "id" not in self._current_movie or int(movie_id) != int(
self._current_movie["id"]
if (
self.coordinator.data.current_movie is None
or int(movie_id) != self.coordinator.data.current_movie
):
await self._client.interview()
await self._client.set_current_movie(int(movie_id))
await self._client.set_mode("movie")
self._client.default_mode = "movie"
await self.client.interview()
await self.client.set_current_movie(int(movie_id))
await self.client.set_mode("movie")
self.client.default_mode = "movie"
if not self._attr_is_on:
await self._client.turn_on()
await self.client.turn_on()
await self.coordinator.async_refresh()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn device off."""
await self._client.turn_off()
await self.client.turn_off()
await self.coordinator.async_refresh()
async def async_update(self) -> None:
"""Asynchronously updates the device properties."""
_LOGGER.debug("Updating '%s'", self._client.host)
def _update_attr(self) -> None:
"""Update the entity attributes."""
self._attr_is_on = self.coordinator.data.is_on
self._attr_brightness = self.coordinator.data.brightness
try:
self._attr_is_on = await self._client.is_on()
brightness = await self._client.get_brightness()
brightness_value = (
int(brightness["value"]) if brightness["mode"] == "enabled" else 100
)
self._attr_brightness = (
int(round(brightness_value * 2.55)) if self._attr_is_on else 0
)
device_info = await self._client.get_details()
if (
DEV_NAME in device_info
and DEV_MODEL in device_info
and (
device_info[DEV_NAME] != self._name
or device_info[DEV_MODEL] != self._model
)
):
self._name = device_info[DEV_NAME]
self._model = device_info[DEV_MODEL]
# If the name has changed, persist it in conf entry,
# so we will be able to restore this new name if hass
# is started while the LED string is offline.
self.hass.config_entries.async_update_entry(
self._conf,
data={
CONF_HOST: self._client.host, # this cannot change
CONF_ID: self._attr_unique_id, # this cannot change
CONF_NAME: self._name,
CONF_MODEL: self._model,
},
)
device_registry = dr.async_get(self.hass)
device_entry = device_registry.async_get_device(
{(DOMAIN, self._attr_unique_id)}
)
if device_entry:
device_registry.async_update_device(
device_entry.id, name=self._name, model=self._model
)
if LightEntityFeature.EFFECT & self.supported_features:
await self.async_update_movies()
await self.async_update_current_movie()
if not self._attr_available:
_LOGGER.warning("Twinkly '%s' is now available", self._client.host)
# We don't use the echo API to track the availability since
# we already have to pull the device to get its state.
self._attr_available = True
except (TimeoutError, ClientError):
# We log this as "info" as it's pretty common that the Christmas
# light are not reachable in July
if self._attr_available:
_LOGGER.warning(
"Twinkly '%s' is not reachable (client error)", self._client.host
)
self._attr_available = False
async def async_update_movies(self) -> None:
"""Update the list of movies (effects)."""
movies = await self._client.get_saved_movies()
_LOGGER.debug("Movies: %s", movies)
if movies and "movies" in movies:
self._movies = movies["movies"]
async def async_update_current_movie(self) -> None:
"""Update the current active movie."""
current_movie = await self._client.get_current_movie()
_LOGGER.debug("Current movie: %s", current_movie)
if current_movie and "id" in current_movie:
self._current_movie = current_movie
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self._update_attr()
super()._handle_coordinator_update()

View File

@ -10,11 +10,7 @@ from homeassistant.const import CONF_HOST, CONF_ID, CONF_MODEL, CONF_NAME
from .const import TEST_MAC, TEST_MODEL, TEST_NAME
from tests.common import (
MockConfigEntry,
load_json_array_fixture,
load_json_object_fixture,
)
from tests.common import MockConfigEntry, load_json_object_fixture
@pytest.fixture
@ -55,7 +51,7 @@ def mock_twinkly_client() -> Generator[AsyncMock]:
client.get_firmware_version.return_value = load_json_object_fixture(
"get_firmware_version.json", DOMAIN
)
client.get_saved_movies.return_value = load_json_array_fixture(
client.get_saved_movies.return_value = load_json_object_fixture(
"get_saved_movies.json", DOMAIN
)
client.get_current_movie.return_value = load_json_object_fixture(

View File

@ -1,4 +1,12 @@
[
{ "id": 1, "name": "Rainbow" },
{ "id": 2, "name": "Flare" }
]
{
"movies": [
{
"id": 1,
"name": "Rainbow"
},
{
"id": 2,
"name": "Flare"
}
]
}

View File

@ -4,8 +4,10 @@
'attributes': dict({
'brightness': 26,
'color_mode': 'rgb',
'effect': None,
'effect': '1 Rainbow',
'effect_list': list([
'1 Rainbow',
'2 Flare',
]),
'friendly_name': 'Tree 1',
'hs_color': list([

View File

@ -6,6 +6,8 @@
'area_id': None,
'capabilities': dict({
'effect_list': list([
'1 Rainbow',
'2 Flare',
]),
'supported_color_modes': list([
<ColorMode.RGB: 'rgb'>,
@ -43,8 +45,10 @@
'attributes': ReadOnlyDict({
'brightness': 26,
'color_mode': <ColorMode.RGB: 'rgb'>,
'effect': None,
'effect': '1 Rainbow',
'effect_list': list([
'1 Rainbow',
'2 Flare',
]),
'friendly_name': 'Tree 1',
'hs_color': tuple(

View File

@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy import SnapshotAssertion
from ttls.client import TwinklyError
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
@ -25,6 +26,7 @@ from homeassistant.const import (
SERVICE_TURN_OFF,
SERVICE_TURN_ON,
STATE_OFF,
STATE_UNAVAILABLE,
Platform,
)
from homeassistant.core import HomeAssistant
@ -278,6 +280,28 @@ async def test_turn_off(
mock_twinkly_client.turn_off.assert_called_once_with()
async def test_no_current_movie(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_twinkly_client: AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test handling of missing current movie data."""
await setup_integration(hass, mock_config_entry)
assert hass.states.get("light.tree_1").attributes[ATTR_EFFECT] == "1 Rainbow"
mock_twinkly_client.get_current_movie.side_effect = TwinklyError
freezer.tick(timedelta(seconds=30))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get("light.tree_1").state != STATE_UNAVAILABLE
assert hass.states.get("light.tree_1").attributes[ATTR_EFFECT] is None
async def test_update_name(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
@ -294,6 +318,10 @@ async def test_update_name(
await setup_integration(hass, mock_config_entry)
dev_entry = device_registry.async_get_device({(DOMAIN, TEST_MAC)})
assert dev_entry.name == "Tree 1"
mock_twinkly_client.get_details.return_value["device_name"] = "new_device_name"
freezer.tick(timedelta(seconds=30))