Robert Resch e3f2f30395
Add circular mean statistics and sensor state class MEASUREMENT_ANGLE (#138453)
* Add circular mean statistics

* fixes

* Add has_circular_mean and fix tests

* Fix mypy

* Rename to MEASUREMENT_ANGLE

* Fix kitchen_sink tests

* Fix sensor tests

* for testing only

* Revert ws command change

* Apply suggestions

* test only

* add custom handling for postgres

* fix recursion limit

* Check if column is already available

* Set default false and not nullable for has_circular_mean

* Proper fix to be backwards compatible

* Fix value is None

* Align with schema

* Remove has_circular_mean from test schemas as it's not required anymore

* fix wrong column type

* Use correct variable to reduce stats

* Add guard that the uom is matching a valid one from the state class

* Add some tests

* Fix tests again

* Use mean_type in StatisticsMetato difference between different mean type algorithms

* Fix leftovers

* Fix kitchen_sink tests

* Fix postgres

* Add circular mean test

* Add mean_type_changed stats issue

* Align the attributes with unit_changed

* Fix mean_type_change stats issue

* Add missing sensor recorder tests

* Add test_statistic_during_period_circular_mean

* Add mean_weight

* Add test_statistic_during_period_hole_circular_mean

* Use seperate migration step to null has_mean

* Typo ARITHMETIC

* Implement requested changes

* Implement requested changes

* Split into #141444

* Add StatisticMeanType.NONE and forbid that mean_type can be None

* Fix mean_type

* Implement requested changes

* Small leftover of latest StatisticMeanType changes
2025-03-26 13:15:58 +01:00

174 lines
6.4 KiB
Python

"""Coordinator for Tibber sensors."""
from __future__ import annotations
from datetime import timedelta
import logging
from typing import cast
import tibber
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import (
StatisticData,
StatisticMeanType,
StatisticMetaData,
)
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
statistics_during_period,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfEnergy
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from .const import DOMAIN as TIBBER_DOMAIN
FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
tibber_connection: tibber.Tibber,
) -> None:
"""Initialize the data handler."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpExceptionError:
# Fatal error. Reload config entry to show correct error.
self.hass.async_create_task(
self.hass.config_entries.async_reload(self.config_entry.entry_id)
)
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
sensors: list[tuple[str, bool, str]] = []
if home.hourly_consumption_data:
sensors.append(("consumption", False, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("totalCost", False, home.currency))
if home.hourly_production_data:
sensors.append(("production", True, UnitOfEnergy.KILO_WATT_HOUR))
sensors.append(("profit", True, home.currency))
for sensor_type, is_production, unit in sensors:
statistic_id = (
f"{TIBBER_DOMAIN}:energy_"
f"{sensor_type.lower()}_"
f"{home.home_id.replace('-', '')}"
)
last_stats = await get_instance(self.hass).async_add_executor_job(
get_last_statistics, self.hass, 1, statistic_id, True, set()
)
if not last_stats:
# First time we insert 5 years of data (if available)
hourly_data = await home.get_historic_data(
5 * 365 * 24, production=is_production
)
_sum = 0.0
last_stats_time = None
else:
# hourly_consumption/production_data contains the last 30 days
# of consumption/production data.
# We update the statistics with the last 30 days
# of data to handle corrections in the data.
hourly_data = (
home.hourly_production_data
if is_production
else home.hourly_consumption_data
)
from_time = dt_util.parse_datetime(hourly_data[0]["from"])
if from_time is None:
continue
start = from_time - timedelta(hours=1)
stat = await get_instance(self.hass).async_add_executor_job(
statistics_during_period,
self.hass,
start,
None,
{statistic_id},
"hour",
None,
{"sum"},
)
if statistic_id in stat:
first_stat = stat[statistic_id][0]
_sum = cast(float, first_stat["sum"])
last_stats_time = first_stat["start"]
else:
hourly_data = await home.get_historic_data(
FIVE_YEARS, production=is_production
)
_sum = 0.0
last_stats_time = None
statistics = []
last_stats_time_dt = (
dt_util.utc_from_timestamp(last_stats_time)
if last_stats_time
else None
)
for data in hourly_data:
if data.get(sensor_type) is None:
continue
from_time = dt_util.parse_datetime(data["from"])
if from_time is None or (
last_stats_time_dt is not None
and from_time <= last_stats_time_dt
):
continue
_sum += data[sensor_type]
statistics.append(
StatisticData(
start=from_time,
state=data[sensor_type],
sum=_sum,
)
)
metadata = StatisticMetaData(
mean_type=StatisticMeanType.NONE,
has_sum=True,
name=f"{home.name} {sensor_type}",
source=TIBBER_DOMAIN,
statistic_id=statistic_id,
unit_of_measurement=unit,
)
async_add_external_statistics(self.hass, metadata, statistics)