Fix derivative integration showing unexpected spikes (#65528)

This commit is contained in:
sophof 2022-02-23 19:16:12 +01:00 committed by GitHub
parent 2a697bdf41
commit 79d267f8d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 129 additions and 24 deletions

View File

@ -1,6 +1,7 @@
"""Numeric derivative of data coming from a source sensor over time.""" """Numeric derivative of data coming from a source sensor over time."""
from __future__ import annotations from __future__ import annotations
from datetime import timedelta
from decimal import Decimal, DecimalException from decimal import Decimal, DecimalException
import logging import logging
@ -112,7 +113,9 @@ class DerivativeSensor(RestoreEntity, SensorEntity):
self._sensor_source_id = source_entity self._sensor_source_id = source_entity
self._round_digits = round_digits self._round_digits = round_digits
self._state = 0 self._state = 0
self._state_list = [] # List of tuples with (timestamp, sensor_value) self._state_list = (
[]
) # List of tuples with (timestamp_start, timestamp_end, derivative)
self._name = name if name is not None else f"{source_entity} derivative" self._name = name if name is not None else f"{source_entity} derivative"
@ -149,39 +152,32 @@ class DerivativeSensor(RestoreEntity, SensorEntity):
): ):
return return
now = new_state.last_updated
# Filter out the tuples that are older than (and outside of the) `time_window`
self._state_list = [
(timestamp, state)
for timestamp, state in self._state_list
if (now - timestamp).total_seconds() < self._time_window
]
# It can happen that the list is now empty, in that case
# we use the old_state, because we cannot do anything better.
if len(self._state_list) == 0:
self._state_list.append((old_state.last_updated, old_state.state))
self._state_list.append((new_state.last_updated, new_state.state))
if self._unit_of_measurement is None: if self._unit_of_measurement is None:
unit = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) unit = new_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
self._unit_of_measurement = self._unit_template.format( self._unit_of_measurement = self._unit_template.format(
"" if unit is None else unit "" if unit is None else unit
) )
try: # filter out all derivatives older than `time_window` from our window list
# derivative of previous measures. self._state_list = [
last_time, last_value = self._state_list[-1] (time_start, time_end, state)
first_time, first_value = self._state_list[0] for time_start, time_end, state in self._state_list
if (new_state.last_updated - time_end).total_seconds()
< self._time_window
]
elapsed_time = (last_time - first_time).total_seconds() try:
delta_value = Decimal(last_value) - Decimal(first_value) elapsed_time = (
derivative = ( new_state.last_updated - old_state.last_updated
).total_seconds()
delta_value = Decimal(new_state.state) - Decimal(old_state.state)
new_derivative = (
delta_value delta_value
/ Decimal(elapsed_time) / Decimal(elapsed_time)
/ Decimal(self._unit_prefix) / Decimal(self._unit_prefix)
* Decimal(self._unit_time) * Decimal(self._unit_time)
) )
assert isinstance(derivative, Decimal)
except ValueError as err: except ValueError as err:
_LOGGER.warning("While calculating derivative: %s", err) _LOGGER.warning("While calculating derivative: %s", err)
except DecimalException as err: except DecimalException as err:
@ -190,9 +186,32 @@ class DerivativeSensor(RestoreEntity, SensorEntity):
) )
except AssertionError as err: except AssertionError as err:
_LOGGER.error("Could not calculate derivative: %s", err) _LOGGER.error("Could not calculate derivative: %s", err)
# add latest derivative to the window list
self._state_list.append(
(old_state.last_updated, new_state.last_updated, new_derivative)
)
def calculate_weight(start, end, now):
window_start = now - timedelta(seconds=self._time_window)
if start < window_start:
weight = (end - window_start).total_seconds() / self._time_window
else:
weight = (end - start).total_seconds() / self._time_window
return weight
# If outside of time window just report derivative (is the same as modeling it in the window),
# otherwise take the weighted average with the previous derivatives
if elapsed_time > self._time_window:
derivative = new_derivative
else: else:
self._state = derivative derivative = 0
self.async_write_ha_state() for (start, end, value) in self._state_list:
weight = calculate_weight(start, end, new_state.last_updated)
derivative = derivative + (value * Decimal(weight))
self._state = derivative
self.async_write_ha_state()
async_track_state_change_event( async_track_state_change_event(
self.hass, [self._sensor_source_id], calc_derivative self.hass, [self._sensor_source_id], calc_derivative

View File

@ -1,5 +1,7 @@
"""The tests for the derivative sensor platform.""" """The tests for the derivative sensor platform."""
from datetime import timedelta from datetime import timedelta
from math import sin
import random
from unittest.mock import patch from unittest.mock import patch
from homeassistant.const import POWER_WATT, TIME_HOURS, TIME_MINUTES, TIME_SECONDS from homeassistant.const import POWER_WATT, TIME_HOURS, TIME_MINUTES, TIME_SECONDS
@ -180,6 +182,90 @@ async def test_data_moving_average_for_discrete_sensor(hass):
assert abs(1 - derivative) <= 0.1 + 1e-6 assert abs(1 - derivative) <= 0.1 + 1e-6
async def test_data_moving_average_for_irregular_times(hass):
"""Test derivative sensor state."""
# We simulate the following situation:
# The temperature rises 1 °C per minute for 30 minutes long.
# There is 60 random datapoints (and the start and end) and the signal is normally distributed
# around the expected value with ±0.1°C
# We use a time window of 1 minute and expect an error of less than the standard deviation. (0.01)
time_window = 60
random.seed(0)
times = sorted(random.sample(range(1800), 60))
def temp_function(time):
random.seed(0)
temp = time / (600)
return random.gauss(temp, 0.1)
temperature_values = list(map(temp_function, times))
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": TIME_MINUTES,
"round": 3,
},
)
base = dt_util.utcnow()
for time, value in zip(times, temperature_values):
now = base + timedelta(seconds=time)
with patch("homeassistant.util.dt.utcnow", return_value=now):
hass.states.async_set(entity_id, value, {}, force_update=True)
await hass.async_block_till_done()
if time_window < time and time > times[3]:
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 10% + ε
assert abs(0.1 - derivative) <= 0.01 + 1e-6
async def test_double_signal_after_delay(hass):
"""Test derivative sensor state."""
# The old algorithm would produce extreme values if, after a delay longer than the time window
# there would be two signals, a large spike would be produced. Check explicitly for this situation
time_window = 60
times = [*range(time_window * 10)]
times = times + [
time_window * 20,
time_window * 20 + 0.01,
]
# just apply sine as some sort of temperature change and make sure the change after the delay is very small
temperature_values = [sin(x) for x in times]
temperature_values[-2] = temperature_values[-3] + 0.01
temperature_values[-1] = temperature_values[-2] + 0.01
config, entity_id = await _setup_sensor(
hass,
{
"time_window": {"seconds": time_window},
"unit_time": TIME_MINUTES,
"round": 3,
},
)
base = dt_util.utcnow()
previous = 0
for time, value in zip(times, temperature_values):
now = base + timedelta(seconds=time)
with patch("homeassistant.util.dt.utcnow", return_value=now):
hass.states.async_set(entity_id, value, {}, force_update=True)
await hass.async_block_till_done()
state = hass.states.get("sensor.power")
derivative = round(float(state.state), config["sensor"]["round"])
if time == times[-1]:
# Test that the error is never more than
# (time_window_in_minutes / true_derivative * 100) = 10% + ε
assert abs(previous - derivative) <= 0.01 + 1e-6
previous = derivative
async def test_prefix(hass): async def test_prefix(hass):
"""Test derivative sensor state using a power source.""" """Test derivative sensor state using a power source."""
config = { config = {