Refactor Tibber realtime entity creation (#118031)

This commit is contained in:
epenet 2024-06-21 16:52:09 +02:00 committed by GitHub
parent 4110f4f393
commit f03759295f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Callable
import datetime import datetime
from datetime import timedelta from datetime import timedelta
import logging import logging
@ -291,9 +292,12 @@ async def async_setup_entry(
) )
if home.has_real_time_consumption: if home.has_real_time_consumption:
entity_creator = TibberRtEntityCreator(
async_add_entities, home, entity_registry
)
await home.rt_subscribe( await home.rt_subscribe(
TibberRtDataCoordinator( TibberRtDataCoordinator(
async_add_entities, home, hass entity_creator.add_sensors, home, hass
).async_set_updated_data ).async_set_updated_data
) )
@ -520,38 +524,20 @@ class TibberSensorRT(TibberSensor, CoordinatorEntity["TibberRtDataCoordinator"])
self.async_write_ha_state() self.async_write_ha_state()
class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-enforce-coordinator-module class TibberRtEntityCreator:
"""Handle Tibber realtime data.""" """Create realtime Tibber entities."""
def __init__( def __init__(
self, self,
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
tibber_home: tibber.TibberHome, tibber_home: tibber.TibberHome,
hass: HomeAssistant, entity_registry: er.EntityRegistry,
) -> None: ) -> None:
"""Initialize the data handler.""" """Initialize the data handler."""
self._async_add_entities = async_add_entities self._async_add_entities = async_add_entities
self._tibber_home = tibber_home self._tibber_home = tibber_home
self.hass = hass
self._added_sensors: set[str] = set() self._added_sensors: set[str] = set()
super().__init__( self._entity_registry = entity_registry
hass,
_LOGGER,
name=tibber_home.info["viewer"]["home"]["address"].get(
"address1", "Tibber"
),
)
self._async_remove_device_updates_handler = self.async_add_listener(
self._add_sensors
)
self.entity_registry = er.async_get(hass)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
@callback
def _handle_ha_stop(self, _event: Event) -> None:
"""Handle Home Assistant stopping."""
self._async_remove_device_updates_handler()
@callback @callback
def _migrate_unique_id(self, sensor_description: SensorEntityDescription) -> None: def _migrate_unique_id(self, sensor_description: SensorEntityDescription) -> None:
@ -561,19 +547,19 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
description_key = sensor_description.key description_key = sensor_description.key
entity_id: str | None = None entity_id: str | None = None
if translation_key in RT_SENSORS_UNIQUE_ID_MIGRATION_SIMPLE: if translation_key in RT_SENSORS_UNIQUE_ID_MIGRATION_SIMPLE:
entity_id = self.entity_registry.async_get_entity_id( entity_id = self._entity_registry.async_get_entity_id(
"sensor", "sensor",
TIBBER_DOMAIN, TIBBER_DOMAIN,
f"{home_id}_rt_{translation_key.replace('_', ' ')}", f"{home_id}_rt_{translation_key.replace('_', ' ')}",
) )
elif translation_key in RT_SENSORS_UNIQUE_ID_MIGRATION: elif translation_key in RT_SENSORS_UNIQUE_ID_MIGRATION:
entity_id = self.entity_registry.async_get_entity_id( entity_id = self._entity_registry.async_get_entity_id(
"sensor", "sensor",
TIBBER_DOMAIN, TIBBER_DOMAIN,
f"{home_id}_rt_{RT_SENSORS_UNIQUE_ID_MIGRATION[translation_key]}", f"{home_id}_rt_{RT_SENSORS_UNIQUE_ID_MIGRATION[translation_key]}",
) )
elif translation_key != description_key: elif translation_key != description_key:
entity_id = self.entity_registry.async_get_entity_id( entity_id = self._entity_registry.async_get_entity_id(
"sensor", "sensor",
TIBBER_DOMAIN, TIBBER_DOMAIN,
f"{home_id}_rt_{translation_key}", f"{home_id}_rt_{translation_key}",
@ -590,18 +576,17 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
new_unique_id, new_unique_id,
) )
try: try:
self.entity_registry.async_update_entity( self._entity_registry.async_update_entity(
entity_id, new_unique_id=new_unique_id entity_id, new_unique_id=new_unique_id
) )
except ValueError as err: except ValueError as err:
_LOGGER.error(err) _LOGGER.error(err)
@callback @callback
def _add_sensors(self) -> None: def add_sensors(
self, coordinator: TibberRtDataCoordinator, live_measurement: Any
) -> None:
"""Add sensor.""" """Add sensor."""
if not (live_measurement := self.get_live_measurement()):
return
new_entities = [] new_entities = []
for sensor_description in RT_SENSORS: for sensor_description in RT_SENSORS:
if sensor_description.key in self._added_sensors: if sensor_description.key in self._added_sensors:
@ -615,13 +600,49 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
self._tibber_home, self._tibber_home,
sensor_description, sensor_description,
state, state,
self, coordinator,
) )
new_entities.append(entity) new_entities.append(entity)
self._added_sensors.add(sensor_description.key) self._added_sensors.add(sensor_description.key)
if new_entities: if new_entities:
self._async_add_entities(new_entities) self._async_add_entities(new_entities)
class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-enforce-coordinator-module
"""Handle Tibber realtime data."""
def __init__(
self,
add_sensor_callback: Callable[[TibberRtDataCoordinator, Any], None],
tibber_home: tibber.TibberHome,
hass: HomeAssistant,
) -> None:
"""Initialize the data handler."""
self._add_sensor_callback = add_sensor_callback
super().__init__(
hass,
_LOGGER,
name=tibber_home.info["viewer"]["home"]["address"].get(
"address1", "Tibber"
),
)
self._async_remove_device_updates_handler = self.async_add_listener(
self._data_updated
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._handle_ha_stop)
@callback
def _handle_ha_stop(self, _event: Event) -> None:
"""Handle Home Assistant stopping."""
self._async_remove_device_updates_handler()
@callback
def _data_updated(self) -> None:
"""Triggered when data is updated."""
if live_measurement := self.get_live_measurement():
self._add_sensor_callback(self, live_measurement)
def get_live_measurement(self) -> Any: def get_live_measurement(self) -> Any:
"""Get live measurement data.""" """Get live measurement data."""
if errors := self.data.get("errors"): if errors := self.data.get("errors"):