Change to module function in statistics (#132648)

This commit is contained in:
G Johansson 2024-12-09 15:23:21 +01:00 committed by GitHub
parent 72de5d0fa3
commit 74eddce3d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 375 additions and 264 deletions

View File

@ -57,9 +57,9 @@ async def get_state_characteristics(handler: SchemaCommonFlowHandler) -> vol.Sch
split_entity_id(handler.options[CONF_ENTITY_ID])[0] == BINARY_SENSOR_DOMAIN
)
if is_binary:
options = STATS_BINARY_SUPPORT
options = list(STATS_BINARY_SUPPORT)
else:
options = STATS_NUMERIC_SUPPORT
options = list(STATS_NUMERIC_SUPPORT)
return vol.Schema(
{

View File

@ -53,7 +53,7 @@ from homeassistant.helpers.event import (
async_track_state_report_event,
)
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util
from homeassistant.util.enum import try_parse_enum
@ -97,47 +97,379 @@ STAT_VALUE_MAX = "value_max"
STAT_VALUE_MIN = "value_min"
STAT_VARIANCE = "variance"
def _callable_characteristic_fn(
characteristic: str, binary: bool
) -> Callable[
[deque[bool | float], deque[datetime], int], float | int | datetime | None
]:
"""Return the function callable of one characteristic function."""
Callable[[deque[bool | float], deque[datetime], int], datetime | int | float | None]
if binary:
return STATS_BINARY_SUPPORT[characteristic]
return STATS_NUMERIC_SUPPORT[characteristic]
# Statistics for numeric sensor
def _stat_average_linear(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
if len(states) >= 2:
area: float = 0
for i in range(1, len(states)):
area += (
0.5
* (states[i] + states[i - 1])
* (ages[i] - ages[i - 1]).total_seconds()
)
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
return area / age_range_seconds
return None
def _stat_average_step(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
if len(states) >= 2:
area: float = 0
for i in range(1, len(states)):
area += states[i - 1] * (ages[i] - ages[i - 1]).total_seconds()
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
return area / age_range_seconds
return None
def _stat_average_timeless(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
return _stat_mean(states, ages, percentile)
def _stat_change(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return states[-1] - states[0]
return None
def _stat_change_sample(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 1:
return (states[-1] - states[0]) / (len(states) - 1)
return None
def _stat_change_second(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 1:
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
if age_range_seconds > 0:
return (states[-1] - states[0]) / age_range_seconds
return None
def _stat_count(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> int | None:
return len(states)
def _stat_datetime_newest(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[-1]
return None
def _stat_datetime_oldest(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[0]
return None
def _stat_datetime_value_max(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[states.index(max(states))]
return None
def _stat_datetime_value_min(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
if len(states) > 0:
return ages[states.index(min(states))]
return None
def _stat_distance_95_percent_of_values(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) >= 1:
return (
2 * 1.96 * cast(float, _stat_standard_deviation(states, ages, percentile))
)
return None
def _stat_distance_99_percent_of_values(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) >= 1:
return (
2 * 2.58 * cast(float, _stat_standard_deviation(states, ages, percentile))
)
return None
def _stat_distance_absolute(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return max(states) - min(states)
return None
def _stat_mean(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return statistics.mean(states)
return None
def _stat_mean_circular(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
sin_sum = sum(math.sin(math.radians(x)) for x in states)
cos_sum = sum(math.cos(math.radians(x)) for x in states)
return (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360
return None
def _stat_median(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return statistics.median(states)
return None
def _stat_noisiness(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
if len(states) >= 2:
return cast(float, _stat_sum_differences(states, ages, percentile)) / (
len(states) - 1
)
return None
def _stat_percentile(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return states[0]
if len(states) >= 2:
percentiles = statistics.quantiles(states, n=100, method="exclusive")
return percentiles[percentile - 1]
return None
def _stat_standard_deviation(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
if len(states) >= 2:
return statistics.stdev(states)
return None
def _stat_sum(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return sum(states)
return None
def _stat_sum_differences(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
if len(states) >= 2:
return sum(
abs(j - i) for i, j in zip(list(states), list(states)[1:], strict=False)
)
return None
def _stat_sum_differences_nonnegative(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
if len(states) >= 2:
return sum(
(j - i if j >= i else j - 0)
for i, j in zip(list(states), list(states)[1:], strict=False)
)
return None
def _stat_total(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
return _stat_sum(states, ages, percentile)
def _stat_value_max(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return max(states)
return None
def _stat_value_min(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return min(states)
return None
def _stat_variance(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 0.0
if len(states) >= 2:
return statistics.variance(states)
return None
# Statistics for binary sensor
def _stat_binary_average_step(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) == 1:
return 100.0 * int(states[0] is True)
if len(states) >= 2:
on_seconds: float = 0
for i in range(1, len(states)):
if states[i - 1] is True:
on_seconds += (ages[i] - ages[i - 1]).total_seconds()
age_range_seconds = (ages[-1] - ages[0]).total_seconds()
return 100 / age_range_seconds * on_seconds
return None
def _stat_binary_average_timeless(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
return _stat_binary_mean(states, ages, percentile)
def _stat_binary_count(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> int | None:
return len(states)
def _stat_binary_count_on(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> int | None:
return states.count(True)
def _stat_binary_count_off(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> int | None:
return states.count(False)
def _stat_binary_datetime_newest(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
return _stat_datetime_newest(states, ages, percentile)
def _stat_binary_datetime_oldest(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> datetime | None:
return _stat_datetime_oldest(states, ages, percentile)
def _stat_binary_mean(
states: deque[bool | float], ages: deque[datetime], percentile: int
) -> float | None:
if len(states) > 0:
return 100.0 / len(states) * states.count(True)
return None
# Statistics supported by a sensor source (numeric)
STATS_NUMERIC_SUPPORT = {
STAT_AVERAGE_LINEAR,
STAT_AVERAGE_STEP,
STAT_AVERAGE_TIMELESS,
STAT_CHANGE_SAMPLE,
STAT_CHANGE_SECOND,
STAT_CHANGE,
STAT_COUNT,
STAT_DATETIME_NEWEST,
STAT_DATETIME_OLDEST,
STAT_DATETIME_VALUE_MAX,
STAT_DATETIME_VALUE_MIN,
STAT_DISTANCE_95P,
STAT_DISTANCE_99P,
STAT_DISTANCE_ABSOLUTE,
STAT_MEAN,
STAT_MEAN_CIRCULAR,
STAT_MEDIAN,
STAT_NOISINESS,
STAT_PERCENTILE,
STAT_STANDARD_DEVIATION,
STAT_SUM,
STAT_SUM_DIFFERENCES,
STAT_SUM_DIFFERENCES_NONNEGATIVE,
STAT_TOTAL,
STAT_VALUE_MAX,
STAT_VALUE_MIN,
STAT_VARIANCE,
STAT_AVERAGE_LINEAR: _stat_average_linear,
STAT_AVERAGE_STEP: _stat_average_step,
STAT_AVERAGE_TIMELESS: _stat_average_timeless,
STAT_CHANGE_SAMPLE: _stat_change_sample,
STAT_CHANGE_SECOND: _stat_change_second,
STAT_CHANGE: _stat_change,
STAT_COUNT: _stat_count,
STAT_DATETIME_NEWEST: _stat_datetime_newest,
STAT_DATETIME_OLDEST: _stat_datetime_oldest,
STAT_DATETIME_VALUE_MAX: _stat_datetime_value_max,
STAT_DATETIME_VALUE_MIN: _stat_datetime_value_min,
STAT_DISTANCE_95P: _stat_distance_95_percent_of_values,
STAT_DISTANCE_99P: _stat_distance_99_percent_of_values,
STAT_DISTANCE_ABSOLUTE: _stat_distance_absolute,
STAT_MEAN: _stat_mean,
STAT_MEAN_CIRCULAR: _stat_mean_circular,
STAT_MEDIAN: _stat_median,
STAT_NOISINESS: _stat_noisiness,
STAT_PERCENTILE: _stat_percentile,
STAT_STANDARD_DEVIATION: _stat_standard_deviation,
STAT_SUM: _stat_sum,
STAT_SUM_DIFFERENCES: _stat_sum_differences,
STAT_SUM_DIFFERENCES_NONNEGATIVE: _stat_sum_differences_nonnegative,
STAT_TOTAL: _stat_total,
STAT_VALUE_MAX: _stat_value_max,
STAT_VALUE_MIN: _stat_value_min,
STAT_VARIANCE: _stat_variance,
}
# Statistics supported by a binary_sensor source
STATS_BINARY_SUPPORT = {
STAT_AVERAGE_STEP,
STAT_AVERAGE_TIMELESS,
STAT_COUNT,
STAT_COUNT_BINARY_ON,
STAT_COUNT_BINARY_OFF,
STAT_DATETIME_NEWEST,
STAT_DATETIME_OLDEST,
STAT_MEAN,
STAT_AVERAGE_STEP: _stat_binary_average_step,
STAT_AVERAGE_TIMELESS: _stat_binary_average_timeless,
STAT_COUNT: _stat_binary_count,
STAT_COUNT_BINARY_ON: _stat_binary_count_on,
STAT_COUNT_BINARY_OFF: _stat_binary_count_off,
STAT_DATETIME_NEWEST: _stat_binary_datetime_newest,
STAT_DATETIME_OLDEST: _stat_binary_datetime_oldest,
STAT_MEAN: _stat_binary_mean,
}
STATS_NOT_A_NUMBER = {
@ -366,9 +698,10 @@ class StatisticsSensor(SensorEntity):
self.ages: deque[datetime] = deque(maxlen=self._samples_max_buffer_size)
self._attr_extra_state_attributes = {}
self._state_characteristic_fn: Callable[[], float | int | datetime | None] = (
self._callable_characteristic_fn(self._state_characteristic)
)
self._state_characteristic_fn: Callable[
[deque[bool | float], deque[datetime], int],
float | int | datetime | None,
] = _callable_characteristic_fn(self._state_characteristic, self.is_binary)
self._update_listener: CALLBACK_TYPE | None = None
self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None
@ -754,7 +1087,7 @@ class StatisticsSensor(SensorEntity):
One of the _stat_*() functions is represented by self._state_characteristic_fn().
"""
value = self._state_characteristic_fn()
value = self._state_characteristic_fn(self.states, self.ages, self._percentile)
_LOGGER.debug(
"Updating value: states: %s, ages: %s => %s", self.states, self.ages, value
)
@ -764,225 +1097,3 @@ class StatisticsSensor(SensorEntity):
if self._precision == 0:
value = int(value)
self._attr_native_value = value
def _callable_characteristic_fn(
self, characteristic: str
) -> Callable[[], float | int | datetime | None]:
"""Return the function callable of one characteristic function."""
function: Callable[[], float | int | datetime | None] = getattr(
self,
f"_stat_binary_{characteristic}"
if self.is_binary
else f"_stat_{characteristic}",
)
return function
# Statistics for numeric sensor
def _stat_average_linear(self) -> StateType:
if len(self.states) == 1:
return self.states[0]
if len(self.states) >= 2:
area: float = 0
for i in range(1, len(self.states)):
area += (
0.5
* (self.states[i] + self.states[i - 1])
* (self.ages[i] - self.ages[i - 1]).total_seconds()
)
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
return area / age_range_seconds
return None
def _stat_average_step(self) -> StateType:
if len(self.states) == 1:
return self.states[0]
if len(self.states) >= 2:
area: float = 0
for i in range(1, len(self.states)):
area += (
self.states[i - 1]
* (self.ages[i] - self.ages[i - 1]).total_seconds()
)
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
return area / age_range_seconds
return None
def _stat_average_timeless(self) -> StateType:
return self._stat_mean()
def _stat_change(self) -> StateType:
if len(self.states) > 0:
return self.states[-1] - self.states[0]
return None
def _stat_change_sample(self) -> StateType:
if len(self.states) > 1:
return (self.states[-1] - self.states[0]) / (len(self.states) - 1)
return None
def _stat_change_second(self) -> StateType:
if len(self.states) > 1:
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
if age_range_seconds > 0:
return (self.states[-1] - self.states[0]) / age_range_seconds
return None
def _stat_count(self) -> StateType:
return len(self.states)
def _stat_datetime_newest(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[-1]
return None
def _stat_datetime_oldest(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[0]
return None
def _stat_datetime_value_max(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[self.states.index(max(self.states))]
return None
def _stat_datetime_value_min(self) -> datetime | None:
if len(self.states) > 0:
return self.ages[self.states.index(min(self.states))]
return None
def _stat_distance_95_percent_of_values(self) -> StateType:
if len(self.states) >= 1:
return 2 * 1.96 * cast(float, self._stat_standard_deviation())
return None
def _stat_distance_99_percent_of_values(self) -> StateType:
if len(self.states) >= 1:
return 2 * 2.58 * cast(float, self._stat_standard_deviation())
return None
def _stat_distance_absolute(self) -> StateType:
if len(self.states) > 0:
return max(self.states) - min(self.states)
return None
def _stat_mean(self) -> StateType:
if len(self.states) > 0:
return statistics.mean(self.states)
return None
def _stat_mean_circular(self) -> StateType:
if len(self.states) > 0:
sin_sum = sum(math.sin(math.radians(x)) for x in self.states)
cos_sum = sum(math.cos(math.radians(x)) for x in self.states)
return (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360
return None
def _stat_median(self) -> StateType:
if len(self.states) > 0:
return statistics.median(self.states)
return None
def _stat_noisiness(self) -> StateType:
if len(self.states) == 1:
return 0.0
if len(self.states) >= 2:
return cast(float, self._stat_sum_differences()) / (len(self.states) - 1)
return None
def _stat_percentile(self) -> StateType:
if len(self.states) == 1:
return self.states[0]
if len(self.states) >= 2:
percentiles = statistics.quantiles(self.states, n=100, method="exclusive")
return percentiles[self._percentile - 1]
return None
def _stat_standard_deviation(self) -> StateType:
if len(self.states) == 1:
return 0.0
if len(self.states) >= 2:
return statistics.stdev(self.states)
return None
def _stat_sum(self) -> StateType:
if len(self.states) > 0:
return sum(self.states)
return None
def _stat_sum_differences(self) -> StateType:
if len(self.states) == 1:
return 0.0
if len(self.states) >= 2:
return sum(
abs(j - i)
for i, j in zip(list(self.states), list(self.states)[1:], strict=False)
)
return None
def _stat_sum_differences_nonnegative(self) -> StateType:
if len(self.states) == 1:
return 0.0
if len(self.states) >= 2:
return sum(
(j - i if j >= i else j - 0)
for i, j in zip(list(self.states), list(self.states)[1:], strict=False)
)
return None
def _stat_total(self) -> StateType:
return self._stat_sum()
def _stat_value_max(self) -> StateType:
if len(self.states) > 0:
return max(self.states)
return None
def _stat_value_min(self) -> StateType:
if len(self.states) > 0:
return min(self.states)
return None
def _stat_variance(self) -> StateType:
if len(self.states) == 1:
return 0.0
if len(self.states) >= 2:
return statistics.variance(self.states)
return None
# Statistics for binary sensor
def _stat_binary_average_step(self) -> StateType:
if len(self.states) == 1:
return 100.0 * int(self.states[0] is True)
if len(self.states) >= 2:
on_seconds: float = 0
for i in range(1, len(self.states)):
if self.states[i - 1] is True:
on_seconds += (self.ages[i] - self.ages[i - 1]).total_seconds()
age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds()
return 100 / age_range_seconds * on_seconds
return None
def _stat_binary_average_timeless(self) -> StateType:
return self._stat_binary_mean()
def _stat_binary_count(self) -> StateType:
return len(self.states)
def _stat_binary_count_on(self) -> StateType:
return self.states.count(True)
def _stat_binary_count_off(self) -> StateType:
return self.states.count(False)
def _stat_binary_datetime_newest(self) -> datetime | None:
return self._stat_datetime_newest()
def _stat_binary_datetime_oldest(self) -> datetime | None:
return self._stat_datetime_oldest()
def _stat_binary_mean(self) -> StateType:
if len(self.states) > 0:
return 100.0 / len(self.states) * self.states.count(True)
return None