Fix race condition in statistics that created spikes (#129066)

* fixed race condition and added test case for updates before db load

* removed duplicated code

* improved comments, removed superfluous errors / assertions

* allow both possible outcomes of race condition

* use approx for float comparison

* Update tests/components/statistics/test_sensor.py

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* force new state before database load in race condition test

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
unfug-at-github 2024-10-26 09:23:47 +02:00 committed by GitHub
parent e774c710a8
commit c5ed148c52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 95 additions and 17 deletions

View File

@ -169,8 +169,8 @@ class StatisticsConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
vol.Required("user_input"): dict,
}
)
@callback
def ws_start_preview(
@websocket_api.async_response
async def ws_start_preview(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
@ -234,6 +234,6 @@ def ws_start_preview(
preview_entity.hass = hass
connection.send_result(msg["id"])
connection.subscriptions[msg["id"]] = preview_entity.async_start_preview(
connection.subscriptions[msg["id"]] = await preview_entity.async_start_preview(
async_preview_updated
)

View File

@ -50,7 +50,6 @@ from homeassistant.helpers.event import (
async_track_state_change_event,
)
from homeassistant.helpers.reload import async_setup_reload_service
from homeassistant.helpers.start import async_at_start
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType, StateType
from homeassistant.util import dt as dt_util
from homeassistant.util.enum import try_parse_enum
@ -373,8 +372,7 @@ class StatisticsSensor(SensorEntity):
self._update_listener: CALLBACK_TYPE | None = None
self._preview_callback: Callable[[str, Mapping[str, Any]], None] | None = None
@callback
def async_start_preview(
async def async_start_preview(
self,
preview_callback: Callable[[str, Mapping[str, Any]], None],
) -> CALLBACK_TYPE:
@ -392,7 +390,7 @@ class StatisticsSensor(SensorEntity):
self._preview_callback = preview_callback
self._async_stats_sensor_startup(self.hass)
await self._async_stats_sensor_startup()
return self._call_on_remove_callbacks
@callback
@ -413,10 +411,16 @@ class StatisticsSensor(SensorEntity):
if not self._preview_callback:
self.async_write_ha_state()
@callback
def _async_stats_sensor_startup(self, _: HomeAssistant) -> None:
"""Add listener and get recorded state."""
async def _async_stats_sensor_startup(self) -> None:
"""Add listener and get recorded state.
Historical data needs to be loaded from the database first before we
can start accepting new incoming changes.
This is needed to ensure that the buffer is properly sorted by time.
"""
_LOGGER.debug("Startup for %s", self.entity_id)
if "recorder" in self.hass.config.components:
await self._initialize_from_database()
self.async_on_remove(
async_track_state_change_event(
self.hass,
@ -424,14 +428,10 @@ class StatisticsSensor(SensorEntity):
self._async_stats_sensor_state_listener,
)
)
if "recorder" in self.hass.config.components:
self.hass.async_create_task(self._initialize_from_database())
async def async_added_to_hass(self) -> None:
"""Register callbacks."""
self.async_on_remove(
async_at_start(self.hass, self._async_stats_sensor_startup)
)
await self._async_stats_sensor_startup()
def _add_state_to_queue(self, new_state: State) -> None:
"""Add the state to the queue."""
@ -712,7 +712,9 @@ class StatisticsSensor(SensorEntity):
"""
value = self._state_characteristic_fn()
_LOGGER.debug(
"Updating value: states: %s, ages: %s => %s", self.states, self.ages, value
)
if self._state_characteristic not in STATS_NOT_A_NUMBER:
with contextlib.suppress(TypeError):
value = round(cast(float, value), self._precision)

View File

@ -2,9 +2,11 @@
from __future__ import annotations
from asyncio import Event as AsyncioEvent
from collections.abc import Sequence
from datetime import datetime, timedelta
import statistics
from threading import Event
from typing import Any
from unittest.mock import patch
@ -12,7 +14,7 @@ from freezegun import freeze_time
import pytest
from homeassistant import config as hass_config
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder import Recorder, history
from homeassistant.components.sensor import (
ATTR_STATE_CLASS,
SensorDeviceClass,
@ -50,6 +52,7 @@ from tests.components.recorder.common import async_wait_recording_done
VALUES_BINARY = ["on", "off", "on", "off", "on", "off", "on", "off", "on"]
VALUES_NUMERIC = [17, 20, 15.2, 5, 3.8, 9.2, 6.7, 14, 6]
VALUES_NUMERIC_LINEAR = [1, 2, 3, 4, 5, 6, 7, 8, 9]
async def test_unique_id(
@ -1701,3 +1704,76 @@ async def test_device_id(
statistics_entity = entity_registry.async_get("sensor.statistics")
assert statistics_entity is not None
assert statistics_entity.device_id == source_entity.device_id
async def test_update_before_load(recorder_mock: Recorder, hass: HomeAssistant) -> None:
"""Verify that updates happening before reloading from the database are handled correctly."""
current_time = dt_util.utcnow()
# enable and pre-fill the recorder
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with (
freeze_time(current_time) as freezer,
):
for value in VALUES_NUMERIC_LINEAR:
hass.states.async_set(
"sensor.test_monitored",
str(value),
{ATTR_UNIT_OF_MEASUREMENT: UnitOfTemperature.CELSIUS},
)
await hass.async_block_till_done()
current_time += timedelta(seconds=1)
freezer.move_to(current_time)
await async_wait_recording_done(hass)
# some synchronisation is needed to prevent that loading from the database finishes too soon
# we want this to take long enough to be able to try to add a value BEFORE loading is done
state_changes_during_period_called_evt = AsyncioEvent()
state_changes_during_period_stall_evt = Event()
real_state_changes_during_period = history.state_changes_during_period
def mock_state_changes_during_period(*args, **kwargs):
states = real_state_changes_during_period(*args, **kwargs)
hass.loop.call_soon_threadsafe(state_changes_during_period_called_evt.set)
state_changes_during_period_stall_evt.wait()
return states
# create the statistics component, get filled from database
with patch(
"homeassistant.components.statistics.sensor.history.state_changes_during_period",
mock_state_changes_during_period,
):
assert await async_setup_component(
hass,
"sensor",
{
"sensor": [
{
"platform": "statistics",
"name": "test",
"entity_id": "sensor.test_monitored",
"state_characteristic": "average_step",
"max_age": {"seconds": 10},
},
]
},
)
# adding this value is going to be ignored, since loading from the database hasn't finished yet
# if this value would be added before loading from the database is done
# it would mess up the order of the internal queue which is supposed to be sorted by time
await state_changes_during_period_called_evt.wait()
hass.states.async_set(
"sensor.test_monitored",
"10",
{ATTR_UNIT_OF_MEASUREMENT: DEGREE},
)
state_changes_during_period_stall_evt.set()
await hass.async_block_till_done()
# we will end up with a buffer of [1 .. 9] (10 wasn't added)
# so the computed average_step is 1+2+3+4+5+6+7+8/8 = 4.5
assert float(hass.states.get("sensor.test").state) == pytest.approx(4.5)