From fb32e745fcaaea451298de0d0ff8a0229b8f49f4 Mon Sep 17 00:00:00 2001 From: Joakim Plate Date: Tue, 27 Sep 2022 08:24:58 +0200 Subject: [PATCH] Listen to out of band coil updates in Nibe Heat Pumps (#78976) Listen to callbacks --- .../components/nibe_heatpump/__init__.py | 92 +++++++++++++++---- 1 file changed, 75 insertions(+), 17 deletions(-) diff --git a/homeassistant/components/nibe_heatpump/__init__.py b/homeassistant/components/nibe_heatpump/__init__.py index 590afba5b79..b9921df4e1e 100644 --- a/homeassistant/components/nibe_heatpump/__init__.py +++ b/homeassistant/components/nibe_heatpump/__init__.py @@ -2,7 +2,10 @@ from __future__ import annotations from collections import defaultdict +from collections.abc import Callable, Iterable from datetime import timedelta +from functools import cached_property +from typing import Any, Generic, TypeVar from nibe.coil import Coil from nibe.connection import Connection @@ -18,7 +21,7 @@ from homeassistant.const import ( EVENT_HOMEASSISTANT_STOP, Platform, ) -from homeassistant.core import CALLBACK_TYPE, HomeAssistant +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers import device_registry as dr from homeassistant.helpers.entity import DeviceInfo, async_generate_entity_id @@ -105,7 +108,52 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: return unload_ok -class Coordinator(DataUpdateCoordinator[dict[int, Coil]]): +_DataTypeT = TypeVar("_DataTypeT") +_ContextTypeT = TypeVar("_ContextTypeT") + + +class ContextCoordinator( + Generic[_DataTypeT, _ContextTypeT], DataUpdateCoordinator[_DataTypeT] +): + """Update coordinator with context adjustments.""" + + @cached_property + def context_callbacks(self) -> dict[_ContextTypeT, list[CALLBACK_TYPE]]: + """Return a dict of all callbacks registered for a given context.""" + callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list) + for update_callback, context in list(self._listeners.values()): + assert isinstance(context, set) + for address in context: + callbacks[address].append(update_callback) + return callbacks + + @callback + def async_update_context_listeners(self, contexts: Iterable[_ContextTypeT]) -> None: + """Update all listeners given a set of contexts.""" + update_callbacks: set[CALLBACK_TYPE] = set() + for context in contexts: + update_callbacks.update(self.context_callbacks.get(context, [])) + + for update_callback in update_callbacks: + update_callback() + + @callback + def async_add_listener( + self, update_callback: CALLBACK_TYPE, context: Any = None + ) -> Callable[[], None]: + """Wrap standard function to prune cached callback database.""" + release = super().async_add_listener(update_callback, context) + self.__dict__.pop("context_callbacks", None) + + @callback + def release_update(): + release() + self.__dict__.pop("context_callbacks", None) + + return release_update + + +class Coordinator(ContextCoordinator[dict[int, Coil], int]): """Update coordinator for nibe heat pumps.""" config_entry: ConfigEntry @@ -122,9 +170,18 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]): ) self.data = {} + self.seed: dict[int, Coil] = {} self.connection = connection self.heatpump = heatpump + heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self._on_coil_update) + + def _on_coil_update(self, coil: Coil): + """Handle callback on coil updates.""" + self.data[coil.address] = coil + self.seed[coil.address] = coil + self.async_update_context_listeners([coil.address]) + @property def coils(self) -> list[Coil]: """Return the full coil database.""" @@ -157,9 +214,9 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]): coil.value = value coil = await self.connection.write_coil(coil) - if self.data: - self.data[coil.address] = coil - self.async_update_listeners() + self.data[coil.address] = coil + + self.async_update_context_listeners([coil.address]) async def _async_update_data(self) -> dict[int, Coil]: @retry( @@ -169,25 +226,26 @@ class Coordinator(DataUpdateCoordinator[dict[int, Coil]]): async def read_coil(coil: Coil): return await self.connection.read_coil(coil) - callbacks: dict[int, list[CALLBACK_TYPE]] = defaultdict(list) - for update_callback, context in list(self._listeners.values()): - assert isinstance(context, set) - for address in context: - callbacks[address].append(update_callback) - result: dict[int, Coil] = {} - for address, callback_list in callbacks.items(): + for address in self.context_callbacks.keys(): + if seed := self.seed.pop(address, None): + self.logger.debug("Skipping seeded coil: %d", address) + result[address] = seed + continue + try: coil = self.heatpump.get_coil_by_address(address) - self.data[coil.address] = result[coil.address] = await read_coil(coil) - except (CoilReadException, RetryError) as exception: - raise UpdateFailed(f"Failed to update: {exception}") from exception except CoilNotFoundException as exception: self.logger.debug("Skipping missing coil: %s", exception) + continue - for update_callback in callback_list: - update_callback() + try: + result[coil.address] = await read_coil(coil) + except (CoilReadException, RetryError) as exception: + raise UpdateFailed(f"Failed to update: {exception}") from exception + + self.seed.pop(coil.address, None) return result