diff --git a/homeassistant/components/statistics/config_flow.py b/homeassistant/components/statistics/config_flow.py index 4280c92131a..4c78afbde9c 100644 --- a/homeassistant/components/statistics/config_flow.py +++ b/homeassistant/components/statistics/config_flow.py @@ -57,9 +57,9 @@ async def get_state_characteristics(handler: SchemaCommonFlowHandler) -> vol.Sch split_entity_id(handler.options[CONF_ENTITY_ID])[0] == BINARY_SENSOR_DOMAIN ) if is_binary: - options = STATS_BINARY_SUPPORT + options = list(STATS_BINARY_SUPPORT) else: - options = STATS_NUMERIC_SUPPORT + options = list(STATS_NUMERIC_SUPPORT) return vol.Schema( { diff --git a/homeassistant/components/statistics/sensor.py b/homeassistant/components/statistics/sensor.py index b6f1844f774..8988e0cdd63 100644 --- a/homeassistant/components/statistics/sensor.py +++ b/homeassistant/components/statistics/sensor.py @@ -53,7 +53,7 @@ from homeassistant.helpers.event import ( async_track_state_report_event, ) from homeassistant.helpers.reload import async_setup_reload_service -from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.util import dt as dt_util from homeassistant.util.enum import try_parse_enum @@ -97,47 +97,379 @@ STAT_VALUE_MAX = "value_max" STAT_VALUE_MIN = "value_min" STAT_VARIANCE = "variance" + +def _callable_characteristic_fn( + characteristic: str, binary: bool +) -> Callable[ + [deque[bool | float], deque[datetime], int], float | int | datetime | None +]: + """Return the function callable of one characteristic function.""" + Callable[[deque[bool | float], deque[datetime], int], datetime | int | float | None] + if binary: + return STATS_BINARY_SUPPORT[characteristic] + return STATS_NUMERIC_SUPPORT[characteristic] + + +# Statistics for numeric sensor + + +def _stat_average_linear( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return states[0] + if len(states) >= 2: + area: float = 0 + for i in range(1, len(states)): + area += ( + 0.5 + * (states[i] + states[i - 1]) + * (ages[i] - ages[i - 1]).total_seconds() + ) + age_range_seconds = (ages[-1] - ages[0]).total_seconds() + return area / age_range_seconds + return None + + +def _stat_average_step( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return states[0] + if len(states) >= 2: + area: float = 0 + for i in range(1, len(states)): + area += states[i - 1] * (ages[i] - ages[i - 1]).total_seconds() + age_range_seconds = (ages[-1] - ages[0]).total_seconds() + return area / age_range_seconds + return None + + +def _stat_average_timeless( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + return _stat_mean(states, ages, percentile) + + +def _stat_change( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return states[-1] - states[0] + return None + + +def _stat_change_sample( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 1: + return (states[-1] - states[0]) / (len(states) - 1) + return None + + +def _stat_change_second( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 1: + age_range_seconds = (ages[-1] - ages[0]).total_seconds() + if age_range_seconds > 0: + return (states[-1] - states[0]) / age_range_seconds + return None + + +def _stat_count( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> int | None: + return len(states) + + +def _stat_datetime_newest( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + if len(states) > 0: + return ages[-1] + return None + + +def _stat_datetime_oldest( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + if len(states) > 0: + return ages[0] + return None + + +def _stat_datetime_value_max( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + if len(states) > 0: + return ages[states.index(max(states))] + return None + + +def _stat_datetime_value_min( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + if len(states) > 0: + return ages[states.index(min(states))] + return None + + +def _stat_distance_95_percent_of_values( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) >= 1: + return ( + 2 * 1.96 * cast(float, _stat_standard_deviation(states, ages, percentile)) + ) + return None + + +def _stat_distance_99_percent_of_values( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) >= 1: + return ( + 2 * 2.58 * cast(float, _stat_standard_deviation(states, ages, percentile)) + ) + return None + + +def _stat_distance_absolute( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return max(states) - min(states) + return None + + +def _stat_mean( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return statistics.mean(states) + return None + + +def _stat_mean_circular( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + sin_sum = sum(math.sin(math.radians(x)) for x in states) + cos_sum = sum(math.cos(math.radians(x)) for x in states) + return (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360 + return None + + +def _stat_median( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return statistics.median(states) + return None + + +def _stat_noisiness( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 0.0 + if len(states) >= 2: + return cast(float, _stat_sum_differences(states, ages, percentile)) / ( + len(states) - 1 + ) + return None + + +def _stat_percentile( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return states[0] + if len(states) >= 2: + percentiles = statistics.quantiles(states, n=100, method="exclusive") + return percentiles[percentile - 1] + return None + + +def _stat_standard_deviation( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 0.0 + if len(states) >= 2: + return statistics.stdev(states) + return None + + +def _stat_sum( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return sum(states) + return None + + +def _stat_sum_differences( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 0.0 + if len(states) >= 2: + return sum( + abs(j - i) for i, j in zip(list(states), list(states)[1:], strict=False) + ) + return None + + +def _stat_sum_differences_nonnegative( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 0.0 + if len(states) >= 2: + return sum( + (j - i if j >= i else j - 0) + for i, j in zip(list(states), list(states)[1:], strict=False) + ) + return None + + +def _stat_total( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + return _stat_sum(states, ages, percentile) + + +def _stat_value_max( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return max(states) + return None + + +def _stat_value_min( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return min(states) + return None + + +def _stat_variance( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 0.0 + if len(states) >= 2: + return statistics.variance(states) + return None + + +# Statistics for binary sensor + + +def _stat_binary_average_step( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) == 1: + return 100.0 * int(states[0] is True) + if len(states) >= 2: + on_seconds: float = 0 + for i in range(1, len(states)): + if states[i - 1] is True: + on_seconds += (ages[i] - ages[i - 1]).total_seconds() + age_range_seconds = (ages[-1] - ages[0]).total_seconds() + return 100 / age_range_seconds * on_seconds + return None + + +def _stat_binary_average_timeless( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + return _stat_binary_mean(states, ages, percentile) + + +def _stat_binary_count( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> int | None: + return len(states) + + +def _stat_binary_count_on( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> int | None: + return states.count(True) + + +def _stat_binary_count_off( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> int | None: + return states.count(False) + + +def _stat_binary_datetime_newest( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + return _stat_datetime_newest(states, ages, percentile) + + +def _stat_binary_datetime_oldest( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> datetime | None: + return _stat_datetime_oldest(states, ages, percentile) + + +def _stat_binary_mean( + states: deque[bool | float], ages: deque[datetime], percentile: int +) -> float | None: + if len(states) > 0: + return 100.0 / len(states) * states.count(True) + return None + + # Statistics supported by a sensor source (numeric) STATS_NUMERIC_SUPPORT = { - STAT_AVERAGE_LINEAR, - STAT_AVERAGE_STEP, - STAT_AVERAGE_TIMELESS, - STAT_CHANGE_SAMPLE, - STAT_CHANGE_SECOND, - STAT_CHANGE, - STAT_COUNT, - STAT_DATETIME_NEWEST, - STAT_DATETIME_OLDEST, - STAT_DATETIME_VALUE_MAX, - STAT_DATETIME_VALUE_MIN, - STAT_DISTANCE_95P, - STAT_DISTANCE_99P, - STAT_DISTANCE_ABSOLUTE, - STAT_MEAN, - STAT_MEAN_CIRCULAR, - STAT_MEDIAN, - STAT_NOISINESS, - STAT_PERCENTILE, - STAT_STANDARD_DEVIATION, - STAT_SUM, - STAT_SUM_DIFFERENCES, - STAT_SUM_DIFFERENCES_NONNEGATIVE, - STAT_TOTAL, - STAT_VALUE_MAX, - STAT_VALUE_MIN, - STAT_VARIANCE, + STAT_AVERAGE_LINEAR: _stat_average_linear, + STAT_AVERAGE_STEP: _stat_average_step, + STAT_AVERAGE_TIMELESS: _stat_average_timeless, + STAT_CHANGE_SAMPLE: _stat_change_sample, + STAT_CHANGE_SECOND: _stat_change_second, + STAT_CHANGE: _stat_change, + STAT_COUNT: _stat_count, + STAT_DATETIME_NEWEST: _stat_datetime_newest, + STAT_DATETIME_OLDEST: _stat_datetime_oldest, + STAT_DATETIME_VALUE_MAX: _stat_datetime_value_max, + STAT_DATETIME_VALUE_MIN: _stat_datetime_value_min, + STAT_DISTANCE_95P: _stat_distance_95_percent_of_values, + STAT_DISTANCE_99P: _stat_distance_99_percent_of_values, + STAT_DISTANCE_ABSOLUTE: _stat_distance_absolute, + STAT_MEAN: _stat_mean, + STAT_MEAN_CIRCULAR: _stat_mean_circular, + STAT_MEDIAN: _stat_median, + STAT_NOISINESS: _stat_noisiness, + STAT_PERCENTILE: _stat_percentile, + STAT_STANDARD_DEVIATION: _stat_standard_deviation, + STAT_SUM: _stat_sum, + STAT_SUM_DIFFERENCES: _stat_sum_differences, + STAT_SUM_DIFFERENCES_NONNEGATIVE: _stat_sum_differences_nonnegative, + STAT_TOTAL: _stat_total, + STAT_VALUE_MAX: _stat_value_max, + STAT_VALUE_MIN: _stat_value_min, + STAT_VARIANCE: _stat_variance, } # Statistics supported by a binary_sensor source STATS_BINARY_SUPPORT = { - STAT_AVERAGE_STEP, - STAT_AVERAGE_TIMELESS, - STAT_COUNT, - STAT_COUNT_BINARY_ON, - STAT_COUNT_BINARY_OFF, - STAT_DATETIME_NEWEST, - STAT_DATETIME_OLDEST, - STAT_MEAN, + STAT_AVERAGE_STEP: _stat_binary_average_step, + STAT_AVERAGE_TIMELESS: _stat_binary_average_timeless, + STAT_COUNT: _stat_binary_count, + STAT_COUNT_BINARY_ON: _stat_binary_count_on, + STAT_COUNT_BINARY_OFF: _stat_binary_count_off, + STAT_DATETIME_NEWEST: _stat_binary_datetime_newest, + STAT_DATETIME_OLDEST: _stat_binary_datetime_oldest, + STAT_MEAN: _stat_binary_mean, } STATS_NOT_A_NUMBER = { @@ -366,9 +698,10 @@ class StatisticsSensor(SensorEntity): self.ages: deque[datetime] = deque(maxlen=self._samples_max_buffer_size) self._attr_extra_state_attributes = {} - self._state_characteristic_fn: Callable[[], float | int | datetime | None] = ( - self._callable_characteristic_fn(self._state_characteristic) - ) + self._state_characteristic_fn: Callable[ + [deque[bool | float], deque[datetime], int], + float | int | datetime | None, + ] = _callable_characteristic_fn(self._state_characteristic, self.is_binary) self._update_listener: CALLBACK_TYPE | None = None self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None @@ -754,7 +1087,7 @@ class StatisticsSensor(SensorEntity): One of the _stat_*() functions is represented by self._state_characteristic_fn(). """ - value = self._state_characteristic_fn() + value = self._state_characteristic_fn(self.states, self.ages, self._percentile) _LOGGER.debug( "Updating value: states: %s, ages: %s => %s", self.states, self.ages, value ) @@ -764,225 +1097,3 @@ class StatisticsSensor(SensorEntity): if self._precision == 0: value = int(value) self._attr_native_value = value - - def _callable_characteristic_fn( - self, characteristic: str - ) -> Callable[[], float | int | datetime | None]: - """Return the function callable of one characteristic function.""" - function: Callable[[], float | int | datetime | None] = getattr( - self, - f"_stat_binary_{characteristic}" - if self.is_binary - else f"_stat_{characteristic}", - ) - return function - - # Statistics for numeric sensor - - def _stat_average_linear(self) -> StateType: - if len(self.states) == 1: - return self.states[0] - if len(self.states) >= 2: - area: float = 0 - for i in range(1, len(self.states)): - area += ( - 0.5 - * (self.states[i] + self.states[i - 1]) - * (self.ages[i] - self.ages[i - 1]).total_seconds() - ) - age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() - return area / age_range_seconds - return None - - def _stat_average_step(self) -> StateType: - if len(self.states) == 1: - return self.states[0] - if len(self.states) >= 2: - area: float = 0 - for i in range(1, len(self.states)): - area += ( - self.states[i - 1] - * (self.ages[i] - self.ages[i - 1]).total_seconds() - ) - age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() - return area / age_range_seconds - return None - - def _stat_average_timeless(self) -> StateType: - return self._stat_mean() - - def _stat_change(self) -> StateType: - if len(self.states) > 0: - return self.states[-1] - self.states[0] - return None - - def _stat_change_sample(self) -> StateType: - if len(self.states) > 1: - return (self.states[-1] - self.states[0]) / (len(self.states) - 1) - return None - - def _stat_change_second(self) -> StateType: - if len(self.states) > 1: - age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() - if age_range_seconds > 0: - return (self.states[-1] - self.states[0]) / age_range_seconds - return None - - def _stat_count(self) -> StateType: - return len(self.states) - - def _stat_datetime_newest(self) -> datetime | None: - if len(self.states) > 0: - return self.ages[-1] - return None - - def _stat_datetime_oldest(self) -> datetime | None: - if len(self.states) > 0: - return self.ages[0] - return None - - def _stat_datetime_value_max(self) -> datetime | None: - if len(self.states) > 0: - return self.ages[self.states.index(max(self.states))] - return None - - def _stat_datetime_value_min(self) -> datetime | None: - if len(self.states) > 0: - return self.ages[self.states.index(min(self.states))] - return None - - def _stat_distance_95_percent_of_values(self) -> StateType: - if len(self.states) >= 1: - return 2 * 1.96 * cast(float, self._stat_standard_deviation()) - return None - - def _stat_distance_99_percent_of_values(self) -> StateType: - if len(self.states) >= 1: - return 2 * 2.58 * cast(float, self._stat_standard_deviation()) - return None - - def _stat_distance_absolute(self) -> StateType: - if len(self.states) > 0: - return max(self.states) - min(self.states) - return None - - def _stat_mean(self) -> StateType: - if len(self.states) > 0: - return statistics.mean(self.states) - return None - - def _stat_mean_circular(self) -> StateType: - if len(self.states) > 0: - sin_sum = sum(math.sin(math.radians(x)) for x in self.states) - cos_sum = sum(math.cos(math.radians(x)) for x in self.states) - return (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360 - return None - - def _stat_median(self) -> StateType: - if len(self.states) > 0: - return statistics.median(self.states) - return None - - def _stat_noisiness(self) -> StateType: - if len(self.states) == 1: - return 0.0 - if len(self.states) >= 2: - return cast(float, self._stat_sum_differences()) / (len(self.states) - 1) - return None - - def _stat_percentile(self) -> StateType: - if len(self.states) == 1: - return self.states[0] - if len(self.states) >= 2: - percentiles = statistics.quantiles(self.states, n=100, method="exclusive") - return percentiles[self._percentile - 1] - return None - - def _stat_standard_deviation(self) -> StateType: - if len(self.states) == 1: - return 0.0 - if len(self.states) >= 2: - return statistics.stdev(self.states) - return None - - def _stat_sum(self) -> StateType: - if len(self.states) > 0: - return sum(self.states) - return None - - def _stat_sum_differences(self) -> StateType: - if len(self.states) == 1: - return 0.0 - if len(self.states) >= 2: - return sum( - abs(j - i) - for i, j in zip(list(self.states), list(self.states)[1:], strict=False) - ) - return None - - def _stat_sum_differences_nonnegative(self) -> StateType: - if len(self.states) == 1: - return 0.0 - if len(self.states) >= 2: - return sum( - (j - i if j >= i else j - 0) - for i, j in zip(list(self.states), list(self.states)[1:], strict=False) - ) - return None - - def _stat_total(self) -> StateType: - return self._stat_sum() - - def _stat_value_max(self) -> StateType: - if len(self.states) > 0: - return max(self.states) - return None - - def _stat_value_min(self) -> StateType: - if len(self.states) > 0: - return min(self.states) - return None - - def _stat_variance(self) -> StateType: - if len(self.states) == 1: - return 0.0 - if len(self.states) >= 2: - return statistics.variance(self.states) - return None - - # Statistics for binary sensor - - def _stat_binary_average_step(self) -> StateType: - if len(self.states) == 1: - return 100.0 * int(self.states[0] is True) - if len(self.states) >= 2: - on_seconds: float = 0 - for i in range(1, len(self.states)): - if self.states[i - 1] is True: - on_seconds += (self.ages[i] - self.ages[i - 1]).total_seconds() - age_range_seconds = (self.ages[-1] - self.ages[0]).total_seconds() - return 100 / age_range_seconds * on_seconds - return None - - def _stat_binary_average_timeless(self) -> StateType: - return self._stat_binary_mean() - - def _stat_binary_count(self) -> StateType: - return len(self.states) - - def _stat_binary_count_on(self) -> StateType: - return self.states.count(True) - - def _stat_binary_count_off(self) -> StateType: - return self.states.count(False) - - def _stat_binary_datetime_newest(self) -> datetime | None: - return self._stat_datetime_newest() - - def _stat_binary_datetime_oldest(self) -> datetime | None: - return self._stat_datetime_oldest() - - def _stat_binary_mean(self) -> StateType: - if len(self.states) > 0: - return 100.0 / len(self.states) * self.states.count(True) - return None