Move TibberDataCoordinator to separate module (#118027)

This commit is contained in:
epenet 2024-05-24 11:05:54 +02:00 committed by GitHub
parent 95840a031a
commit d4df86da06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 168 additions and 148 deletions

View File

@ -1428,6 +1428,7 @@ omit =
homeassistant/components/thinkingcleaner/* homeassistant/components/thinkingcleaner/*
homeassistant/components/thomson/device_tracker.py homeassistant/components/thomson/device_tracker.py
homeassistant/components/tibber/__init__.py homeassistant/components/tibber/__init__.py
homeassistant/components/tibber/coordinator.py
homeassistant/components/tibber/sensor.py homeassistant/components/tibber/sensor.py
homeassistant/components/tikteck/light.py homeassistant/components/tikteck/light.py
homeassistant/components/tile/__init__.py homeassistant/components/tile/__init__.py

View File

@ -0,0 +1,163 @@
"""Coordinator for Tibber sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import cast
import tibber
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
statistics_during_period,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfEnergy
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import DOMAIN as TIBBER_DOMAIN
FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, tibber_connection: tibber.Tibber) -> None:
"""Initialize the data handler."""
super().__init__(
hass,
_LOGGER,
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpException as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpException:
# Fatal error. Reload config entry to show correct error.
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
sensors: list[tuple[str, bool, str]] = []
if home.hourly_consumption_data:
sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("totalCost", False, home.currency))
if home.hourly_production_data:
sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("profit", True, home.currency))
for sensor_type, is_production, unit in sensors:
statistic_id = (
f"{TIBBER_DOMAIN}:energy_"
f"{sensor_type.lower()}_"
f"{home.home_id.replace('-', '')}"
)
last_stats = await get_instance(self.hass).async_add_executor_job(
get_last_statistics, self.hass, 1, statistic_id, True, set()
)
if not last_stats:
# First time we insert 5 years of data (if available)
hourly_data = await home.get_historic_data(
5 * 365 * 24, production=is_production
)
_sum = 0.0
last_stats_time = None
else:
# hourly_consumption/production_data contains the last 30 days
# of consumption/production data.
# We update the statistics with the last 30 days
# of data to handle corrections in the data.
hourly_data = (
home.hourly_production_data
if is_production
else home.hourly_consumption_data
)
from_time = dt_util.parse_datetime(hourly_data[0]["from"])
if from_time is None:
continue
start = from_time - timedelta(hours=1)
stat = await get_instance(self.hass).async_add_executor_job(
statistics_during_period,
self.hass,
start,
None,
{statistic_id},
"hour",
None,
{"sum"},
)
if statistic_id in stat:
first_stat = stat[statistic_id][0]
_sum = cast(float, first_stat["sum"])
last_stats_time = first_stat["start"]
else:
hourly_data = await home.get_historic_data(
FIVE_YEARS, production=is_production
)
_sum = 0.0
last_stats_time = None
statistics = []
last_stats_time_dt = (
dt_util.utc_from_timestamp(last_stats_time)
if last_stats_time
else None
)
for data in hourly_data:
if data.get(sensor_type) is None:
continue
from_time = dt_util.parse_datetime(data["from"])
if from_time is None or (
last_stats_time_dt is not None
and from_time <= last_stats_time_dt
):
continue
_sum += data[sensor_type]
statistics.append(
StatisticData(
start=from_time,
state=data[sensor_type],
sum=_sum,
)
)
metadata = StatisticMetaData(
has_mean=False,
has_sum=True,
name=f"{home.name} {sensor_type}",
source=TIBBER_DOMAIN,
statistic_id=statistic_id,
unit_of_measurement=unit,
)
async_add_external_statistics(self.hass, metadata, statistics)

View File

@ -6,18 +6,11 @@ import datetime
from datetime import timedelta from datetime import timedelta
import logging import logging
from random import randrange from random import randrange
from typing import Any, cast from typing import Any
import aiohttp import aiohttp
import tibber import tibber
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
statistics_during_period,
)
from homeassistant.components.sensor import ( from homeassistant.components.sensor import (
SensorDeviceClass, SensorDeviceClass,
SensorEntity, SensorEntity,
@ -47,13 +40,11 @@ from homeassistant.helpers.typing import StateType
from homeassistant.helpers.update_coordinator import ( from homeassistant.helpers.update_coordinator import (
CoordinatorEntity, CoordinatorEntity,
DataUpdateCoordinator, DataUpdateCoordinator,
UpdateFailed,
) )
from homeassistant.util import Throttle, dt as dt_util from homeassistant.util import Throttle, dt as dt_util
from .const import DOMAIN as TIBBER_DOMAIN, MANUFACTURER from .const import DOMAIN as TIBBER_DOMAIN, MANUFACTURER
from .coordinator import TibberDataCoordinator
FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -444,7 +435,7 @@ class TibberSensorElPrice(TibberSensor):
]["estimatedAnnualConsumption"] ]["estimatedAnnualConsumption"]
class TibberDataSensor(TibberSensor, CoordinatorEntity["TibberDataCoordinator"]): class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
"""Representation of a Tibber sensor.""" """Representation of a Tibber sensor."""
def __init__( def __init__(
@ -640,138 +631,3 @@ class TibberRtDataCoordinator(DataUpdateCoordinator): # pylint: disable=hass-en
_LOGGER.error(errors[0]) _LOGGER.error(errors[0])
return None return None
return self.data.get("data", {}).get("liveMeasurement") return self.data.get("data", {}).get("liveMeasurement")
class TibberDataCoordinator(DataUpdateCoordinator[None]): # pylint: disable=hass-enforce-coordinator-module
"""Handle Tibber data and insert statistics."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, tibber_connection: tibber.Tibber) -> None:
"""Initialize the data handler."""
super().__init__(
hass,
_LOGGER,
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpException as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpException:
# Fatal error. Reload config entry to show correct error.
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
sensors: list[tuple[str, bool, str]] = []
if home.hourly_consumption_data:
sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("totalCost", False, home.currency))
if home.hourly_production_data:
sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("profit", True, home.currency))
for sensor_type, is_production, unit in sensors:
statistic_id = (
f"{TIBBER_DOMAIN}:energy_"
f"{sensor_type.lower()}_"
f"{home.home_id.replace('-', '')}"
)
last_stats = await get_instance(self.hass).async_add_executor_job(
get_last_statistics, self.hass, 1, statistic_id, True, set()
)
if not last_stats:
# First time we insert 5 years of data (if available)
hourly_data = await home.get_historic_data(
5 * 365 * 24, production=is_production
)
_sum = 0.0
last_stats_time = None
else:
# hourly_consumption/production_data contains the last 30 days
# of consumption/production data.
# We update the statistics with the last 30 days
# of data to handle corrections in the data.
hourly_data = (
home.hourly_production_data
if is_production
else home.hourly_consumption_data
)
from_time = dt_util.parse_datetime(hourly_data[0]["from"])
if from_time is None:
continue
start = from_time - timedelta(hours=1)
stat = await get_instance(self.hass).async_add_executor_job(
statistics_during_period,
self.hass,
start,
None,
{statistic_id},
"hour",
None,
{"sum"},
)
if statistic_id in stat:
first_stat = stat[statistic_id][0]
_sum = cast(float, first_stat["sum"])
last_stats_time = first_stat["start"]
else:
hourly_data = await home.get_historic_data(
FIVE_YEARS, production=is_production
)
_sum = 0.0
last_stats_time = None
statistics = []
last_stats_time_dt = (
dt_util.utc_from_timestamp(last_stats_time)
if last_stats_time
else None
)
for data in hourly_data:
if data.get(sensor_type) is None:
continue
from_time = dt_util.parse_datetime(data["from"])
if from_time is None or (
last_stats_time_dt is not None
and from_time <= last_stats_time_dt
):
continue
_sum += data[sensor_type]
statistics.append(
StatisticData(
start=from_time,
state=data[sensor_type],
sum=_sum,
)
)
metadata = StatisticMetaData(
has_mean=False,
has_sum=True,
name=f"{home.name} {sensor_type}",
source=TIBBER_DOMAIN,
statistic_id=statistic_id,
unit_of_measurement=unit,
)
async_add_external_statistics(self.hass, metadata, statistics)

View File

@ -4,7 +4,7 @@ from unittest.mock import AsyncMock
from homeassistant.components.recorder import Recorder from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.statistics import statistics_during_period from homeassistant.components.recorder.statistics import statistics_during_period
from homeassistant.components.tibber.sensor import TibberDataCoordinator from homeassistant.components.tibber.coordinator import TibberDataCoordinator
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util