From d4df86da06cf94eb9539dc99152ed12c384fdc19 Mon Sep 17 00:00:00 2001 From: epenet <6771947+epenet@users.noreply.github.com> Date: Fri, 24 May 2024 11:05:54 +0200 Subject: [PATCH] Move TibberDataCoordinator to separate module (#118027) --- .coveragerc | 1 + .../components/tibber/coordinator.py | 163 ++++++++++++++++++ homeassistant/components/tibber/sensor.py | 150 +--------------- tests/components/tibber/test_statistics.py | 2 +- 4 files changed, 168 insertions(+), 148 deletions(-) create mode 100644 homeassistant/components/tibber/coordinator.py diff --git a/.coveragerc b/.coveragerc index 4a1b55c583a..94d445cf8c8 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1428,6 +1428,7 @@ omit = homeassistant/components/thinkingcleaner/* homeassistant/components/thomson/device_tracker.py homeassistant/components/tibber/__init__.py + homeassistant/components/tibber/coordinator.py homeassistant/components/tibber/sensor.py homeassistant/components/tikteck/light.py homeassistant/components/tile/__init__.py diff --git a/homeassistant/components/tibber/coordinator.py b/homeassistant/components/tibber/coordinator.py new file mode 100644 index 00000000000..c3746cb9a58 --- /dev/null +++ b/homeassistant/components/tibber/coordinator.py @@ -0,0 +1,163 @@ +"""Coordinator for Tibber sensors.""" + +from __future__ import annotations + +from datetime import timedelta +import logging +from typing import cast + +import tibber + +from homeassistant.components.recorder import get_instance +from homeassistant.components.recorder.models import StatisticData, StatisticMetaData +from homeassistant.components.recorder.statistics import ( + async_add_external_statistics, + get_last_statistics, + statistics_during_period, +) +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import UnitOfEnergy +from homeassistant.core import HomeAssistant +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed +from homeassistant.util import dt as dt_util + +from .const import DOMAIN as TIBBER_DOMAIN + +FIVE_YEARS = 5 * 365 * 24 + +_LOGGER = logging.getLogger(__name__) + + +class TibberDataCoordinator(DataUpdateCoordinator[None]): + """Handle Tibber data and insert statistics.""" + + config_entry: ConfigEntry + + def __init__(self, hass: HomeAssistant, tibber_connection: tibber.Tibber) -> None: + """Initialize the data handler.""" + super().__init__( + hass, + _LOGGER, + name=f"Tibber {tibber_connection.name}", + update_interval=timedelta(minutes=20), + ) + self._tibber_connection = tibber_connection + + async def _async_update_data(self) -> None: + """Update data via API.""" + try: + await self._tibber_connection.fetch_consumption_data_active_homes() + await self._tibber_connection.fetch_production_data_active_homes() + await self._insert_statistics() + except tibber.RetryableHttpException as err: + raise UpdateFailed(f"Error communicating with API ({err.status})") from err + except tibber.FatalHttpException: + # Fatal error. Reload config entry to show correct error. + self.hass.async_create_task( + self.hass.config_entries.async_reload(self.config_entry.entry_id) + ) + + async def _insert_statistics(self) -> None: + """Insert Tibber statistics.""" + for home in self._tibber_connection.get_homes(): + sensors: list[tuple[str, bool, str]] = [] + if home.hourly_consumption_data: + sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR)) + sensors.append(("totalCost", False, home.currency)) + if home.hourly_production_data: + sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR)) + sensors.append(("profit", True, home.currency)) + + for sensor_type, is_production, unit in sensors: + statistic_id = ( + f"{TIBBER_DOMAIN}:energy_" + f"{sensor_type.lower()}_" + f"{home.home_id.replace('-', '')}" + ) + + last_stats = await get_instance(self.hass).async_add_executor_job( + get_last_statistics, self.hass, 1, statistic_id, True, set() + ) + + if not last_stats: + # First time we insert 5 years of data (if available) + hourly_data = await home.get_historic_data( + 5 * 365 * 24, production=is_production + ) + + _sum = 0.0 + last_stats_time = None + else: + # hourly_consumption/production_data contains the last 30 days + # of consumption/production data. + # We update the statistics with the last 30 days + # of data to handle corrections in the data. + hourly_data = ( + home.hourly_production_data + if is_production + else home.hourly_consumption_data + ) + + from_time = dt_util.parse_datetime(hourly_data[0]["from"]) + if from_time is None: + continue + start = from_time - timedelta(hours=1) + stat = await get_instance(self.hass).async_add_executor_job( + statistics_during_period, + self.hass, + start, + None, + {statistic_id}, + "hour", + None, + {"sum"}, + ) + if statistic_id in stat: + first_stat = stat[statistic_id][0] + _sum = cast(float, first_stat["sum"]) + last_stats_time = first_stat["start"] + else: + hourly_data = await home.get_historic_data( + FIVE_YEARS, production=is_production + ) + _sum = 0.0 + last_stats_time = None + + statistics = [] + + last_stats_time_dt = ( + dt_util.utc_from_timestamp(last_stats_time) + if last_stats_time + else None + ) + + for data in hourly_data: + if data.get(sensor_type) is None: + continue + + from_time = dt_util.parse_datetime(data["from"]) + if from_time is None or ( + last_stats_time_dt is not None + and from_time <= last_stats_time_dt + ): + continue + + _sum += data[sensor_type] + + statistics.append( + StatisticData( + start=from_time, + state=data[sensor_type], + sum=_sum, + ) + ) + + metadata = StatisticMetaData( + has_mean=False, + has_sum=True, + name=f"{home.name} {sensor_type}", + source=TIBBER_DOMAIN, + statistic_id=statistic_id, + unit_of_measurement=unit, + ) + async_add_external_statistics(self.hass, metadata, statistics) diff --git a/homeassistant/components/tibber/sensor.py b/homeassistant/components/tibber/sensor.py index 0760b5309a3..e1b4bfa873d 100644 --- a/homeassistant/components/tibber/sensor.py +++ b/homeassistant/components/tibber/sensor.py @@ -6,18 +6,11 @@ import datetime from datetime import timedelta import logging from random import randrange -from typing import Any, cast +from typing import Any import aiohttp import tibber -from homeassistant.components.recorder import get_instance -from homeassistant.components.recorder.models import StatisticData, StatisticMetaData -from homeassistant.components.recorder.statistics import ( - async_add_external_statistics, - get_last_statistics, - statistics_during_period, -) from homeassistant.components.sensor import ( SensorDeviceClass, SensorEntity, @@ -47,13 +40,11 @@ from homeassistant.helpers.typing import StateType from homeassistant.helpers.update_coordinator import ( CoordinatorEntity, DataUpdateCoordinator, - UpdateFailed, ) from homeassistant.util import Throttle, dt as dt_util from .const import DOMAIN as TIBBER_DOMAIN, MANUFACTURER - -FIVE_YEARS = 5 * 365 * 24 +from .coordinator import TibberDataCoordinator _LOGGER = logging.getLogger(__name__) @@ -444,7 +435,7 @@ class TibberSensorElPrice(TibberSensor): ]["estimatedAnnualConsumption"] -class TibberDataSensor(TibberSensor, CoordinatorEntity["TibberDataCoordinator"]): +class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]): """Representation of a Tibber sensor.""" def __init__( @@ -640,138 +631,3 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en _LOGGER.error(errors[0]) return None return self.data.get("data", {}).get("liveMeasurement") - - -class TibberDataCoordinator(DataUpdateCoordinator[None]): # pylint: disable=hass-enforce-coordinator-module - """Handle Tibber data and insert statistics.""" - - config_entry: ConfigEntry - - def __init__(self, hass: HomeAssistant, tibber_connection: tibber.Tibber) -> None: - """Initialize the data handler.""" - super().__init__( - hass, - _LOGGER, - name=f"Tibber {tibber_connection.name}", - update_interval=timedelta(minutes=20), - ) - self._tibber_connection = tibber_connection - - async def _async_update_data(self) -> None: - """Update data via API.""" - try: - await self._tibber_connection.fetch_consumption_data_active_homes() - await self._tibber_connection.fetch_production_data_active_homes() - await self._insert_statistics() - except tibber.RetryableHttpException as err: - raise UpdateFailed(f"Error communicating with API ({err.status})") from err - except tibber.FatalHttpException: - # Fatal error. Reload config entry to show correct error. - self.hass.async_create_task( - self.hass.config_entries.async_reload(self.config_entry.entry_id) - ) - - async def _insert_statistics(self) -> None: - """Insert Tibber statistics.""" - for home in self._tibber_connection.get_homes(): - sensors: list[tuple[str, bool, str]] = [] - if home.hourly_consumption_data: - sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR)) - sensors.append(("totalCost", False, home.currency)) - if home.hourly_production_data: - sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR)) - sensors.append(("profit", True, home.currency)) - - for sensor_type, is_production, unit in sensors: - statistic_id = ( - f"{TIBBER_DOMAIN}:energy_" - f"{sensor_type.lower()}_" - f"{home.home_id.replace('-', '')}" - ) - - last_stats = await get_instance(self.hass).async_add_executor_job( - get_last_statistics, self.hass, 1, statistic_id, True, set() - ) - - if not last_stats: - # First time we insert 5 years of data (if available) - hourly_data = await home.get_historic_data( - 5 * 365 * 24, production=is_production - ) - - _sum = 0.0 - last_stats_time = None - else: - # hourly_consumption/production_data contains the last 30 days - # of consumption/production data. - # We update the statistics with the last 30 days - # of data to handle corrections in the data. - hourly_data = ( - home.hourly_production_data - if is_production - else home.hourly_consumption_data - ) - - from_time = dt_util.parse_datetime(hourly_data[0]["from"]) - if from_time is None: - continue - start = from_time - timedelta(hours=1) - stat = await get_instance(self.hass).async_add_executor_job( - statistics_during_period, - self.hass, - start, - None, - {statistic_id}, - "hour", - None, - {"sum"}, - ) - if statistic_id in stat: - first_stat = stat[statistic_id][0] - _sum = cast(float, first_stat["sum"]) - last_stats_time = first_stat["start"] - else: - hourly_data = await home.get_historic_data( - FIVE_YEARS, production=is_production - ) - _sum = 0.0 - last_stats_time = None - - statistics = [] - - last_stats_time_dt = ( - dt_util.utc_from_timestamp(last_stats_time) - if last_stats_time - else None - ) - - for data in hourly_data: - if data.get(sensor_type) is None: - continue - - from_time = dt_util.parse_datetime(data["from"]) - if from_time is None or ( - last_stats_time_dt is not None - and from_time <= last_stats_time_dt - ): - continue - - _sum += data[sensor_type] - - statistics.append( - StatisticData( - start=from_time, - state=data[sensor_type], - sum=_sum, - ) - ) - - metadata = StatisticMetaData( - has_mean=False, - has_sum=True, - name=f"{home.name} {sensor_type}", - source=TIBBER_DOMAIN, - statistic_id=statistic_id, - unit_of_measurement=unit, - ) - async_add_external_statistics(self.hass, metadata, statistics) diff --git a/tests/components/tibber/test_statistics.py b/tests/components/tibber/test_statistics.py index d6c510a8785..d817c9612aa 100644 --- a/tests/components/tibber/test_statistics.py +++ b/tests/components/tibber/test_statistics.py @@ -4,7 +4,7 @@ from unittest.mock import AsyncMock from homeassistant.components.recorder import Recorder from homeassistant.components.recorder.statistics import statistics_during_period -from homeassistant.components.tibber.sensor import TibberDataCoordinator +from homeassistant.components.tibber.coordinator import TibberDataCoordinator from homeassistant.core import HomeAssistant from homeassistant.util import dt as dt_util