Fix compiling missing statistics losing rows (#101616)

This commit is contained in:
J. Nick Koston 2023-10-08 07:43:00 -10:00 committed by GitHub
parent 1b11062b27
commit c6ed022cce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 133 deletions

View File

@ -526,7 +526,7 @@ def _compile_statistics(
):
continue
compiled: PlatformCompiledStatistics = platform_compile_statistics(
instance.hass, start, end
instance.hass, session, start, end
)
_LOGGER.debug(
"Statistics for %s during %s-%s: %s",
@ -1871,7 +1871,7 @@ def get_latest_short_term_statistics_by_ids(
return list(
cast(
Sequence[Row],
execute_stmt_lambda_element(session, stmt, orm_rows=False),
execute_stmt_lambda_element(session, stmt),
)
)
@ -1887,75 +1887,69 @@ def _latest_short_term_statistics_by_ids_stmt(
)
def get_latest_short_term_statistics(
def get_latest_short_term_statistics_with_session(
hass: HomeAssistant,
session: Session,
statistic_ids: set[str],
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
metadata: dict[str, tuple[int, StatisticMetaData]] | None = None,
) -> dict[str, list[StatisticsRow]]:
"""Return the latest short term statistics for a list of statistic_ids."""
with session_scope(hass=hass, read_only=True) as session:
# Fetch metadata for the given statistic_ids
if not metadata:
metadata = get_instance(hass).statistics_meta_manager.get_many(
session, statistic_ids=statistic_ids
)
if not metadata:
return {}
metadata_ids = set(
_extract_metadata_and_discard_impossible_columns(metadata, types)
"""Return the latest short term statistics for a list of statistic_ids with a session."""
# Fetch metadata for the given statistic_ids
if not metadata:
metadata = get_instance(hass).statistics_meta_manager.get_many(
session, statistic_ids=statistic_ids
)
run_cache = get_short_term_statistics_run_cache(hass)
# Try to find the latest short term statistics ids for the metadata_ids
# from the run cache first if we have it. If the run cache references
# a non-existent id because of a purge, we will detect it missing in the
# next step and run a query to re-populate the cache.
stats: list[Row] = []
if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
stats = get_latest_short_term_statistics_by_ids(
session, metadata_id_to_id.values()
)
# If we are missing some metadata_ids in the run cache, we need run a query
# to populate the cache for each metadata_id, and then run another query
# to get the latest short term statistics for the missing metadata_ids.
if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and (
found_latest_ids := {
latest_id
for metadata_id in missing_metadata_ids
if (
latest_id := cache_latest_short_term_statistic_id_for_metadata_id(
# orm_rows=False is used here because we are in
# a read-only session, and there will never be
# any pending inserts in the session.
run_cache,
session,
metadata_id,
orm_rows=False,
)
if not metadata:
return {}
metadata_ids = set(
_extract_metadata_and_discard_impossible_columns(metadata, types)
)
run_cache = get_short_term_statistics_run_cache(hass)
# Try to find the latest short term statistics ids for the metadata_ids
# from the run cache first if we have it. If the run cache references
# a non-existent id because of a purge, we will detect it missing in the
# next step and run a query to re-populate the cache.
stats: list[Row] = []
if metadata_id_to_id := run_cache.get_latest_ids(metadata_ids):
stats = get_latest_short_term_statistics_by_ids(
session, metadata_id_to_id.values()
)
# If we are missing some metadata_ids in the run cache, we need run a query
# to populate the cache for each metadata_id, and then run another query
# to get the latest short term statistics for the missing metadata_ids.
if (missing_metadata_ids := metadata_ids - set(metadata_id_to_id)) and (
found_latest_ids := {
latest_id
for metadata_id in missing_metadata_ids
if (
latest_id := cache_latest_short_term_statistic_id_for_metadata_id(
run_cache,
session,
metadata_id,
)
is not None
}
):
stats.extend(
get_latest_short_term_statistics_by_ids(session, found_latest_ids)
)
is not None
}
):
stats.extend(get_latest_short_term_statistics_by_ids(session, found_latest_ids))
if not stats:
return {}
if not stats:
return {}
# Return statistics combined with metadata
return _sorted_statistics_to_dict(
hass,
session,
stats,
statistic_ids,
metadata,
False,
StatisticsShortTerm,
None,
None,
types,
)
# Return statistics combined with metadata
return _sorted_statistics_to_dict(
hass,
session,
stats,
statistic_ids,
metadata,
False,
StatisticsShortTerm,
None,
None,
types,
)
def _generate_statistics_at_time_stmt(
@ -2316,14 +2310,8 @@ def _import_statistics_with_session(
# We just inserted new short term statistics, so we need to update the
# ShortTermStatisticsRunCache with the latest id for the metadata_id
run_cache = get_short_term_statistics_run_cache(instance.hass)
#
# Because we are in the same session and we want to read rows
# that have not been flushed yet, we need to pass orm_rows=True
# to cache_latest_short_term_statistic_id_for_metadata_id
# to ensure that it gets the rows that were just inserted
#
cache_latest_short_term_statistic_id_for_metadata_id(
run_cache, session, metadata_id, orm_rows=True
run_cache, session, metadata_id
)
return True
@ -2341,7 +2329,6 @@ def cache_latest_short_term_statistic_id_for_metadata_id(
run_cache: ShortTermStatisticsRunCache,
session: Session,
metadata_id: int,
orm_rows: bool,
) -> int | None:
"""Cache the latest short term statistic for a given metadata_id.
@ -2352,13 +2339,7 @@ def cache_latest_short_term_statistic_id_for_metadata_id(
if latest := cast(
Sequence[Row],
execute_stmt_lambda_element(
session,
_find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id),
orm_rows=orm_rows
# _import_statistics_with_session needs to be able
# to read back the rows it just inserted without
# a flush so we have to pass orm_rows so we get
# back the latest data.
session, _find_latest_short_term_statistic_for_metadata_id_stmt(metadata_id)
),
):
id_: int = latest[0].id

View File

@ -16,7 +16,6 @@ from homeassistant.components.recorder import (
get_instance,
history,
statistics,
util as recorder_util,
)
from homeassistant.components.recorder.models import (
StatisticData,
@ -374,27 +373,7 @@ def _timestamp_to_isoformat_or_none(timestamp: float | None) -> str | None:
return dt_util.utc_from_timestamp(timestamp).isoformat()
def compile_statistics(
hass: HomeAssistant, start: datetime.datetime, end: datetime.datetime
) -> statistics.PlatformCompiledStatistics:
"""Compile statistics for all entities during start-end.
Note: This will query the database and must not be run in the event loop
"""
# There is already an active session when this code is called since
# it is called from the recorder statistics. We need to make sure
# this session never gets committed since it would be out of sync
# with the recorder statistics session so we mark it as read only.
#
# If we ever need to write to the database from this function we
# will need to refactor the recorder statistics to use a single
# session.
with recorder_util.session_scope(hass=hass, read_only=True) as session:
compiled = _compile_statistics(hass, session, start, end)
return compiled
def _compile_statistics( # noqa: C901
def compile_statistics( # noqa: C901
hass: HomeAssistant,
session: Session,
start: datetime.datetime,
@ -471,8 +450,8 @@ def _compile_statistics( # noqa: C901
if "sum" in wanted_statistics[entity_id]:
to_query.add(entity_id)
last_stats = statistics.get_latest_short_term_statistics(
hass, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas
last_stats = statistics.get_latest_short_term_statistics_with_session(
hass, session, to_query, {"last_reset", "state", "sum"}, metadata=old_metadatas
)
for ( # pylint: disable=too-many-nested-blocks
entity_id,

View File

@ -6,6 +6,7 @@ from typing import Any
import pytest
from homeassistant.components.energy import data
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.sensor import (
ATTR_LAST_RESET,
ATTR_STATE_CLASS,
@ -155,7 +156,10 @@ async def test_cost_sensor_price_entity_total_increasing(
"""Test energy cost price from total_increasing type sensor entity."""
def _compile_statistics(_):
return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=1)
).platform_stats
energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,
@ -365,9 +369,10 @@ async def test_cost_sensor_price_entity_total(
"""Test energy cost price from total type sensor entity."""
def _compile_statistics(_):
return compile_statistics(
hass, now, now + timedelta(seconds=0.17)
).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=0.17)
).platform_stats
energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,
@ -579,7 +584,10 @@ async def test_cost_sensor_price_entity_total_no_reset(
"""Test energy cost price from total type sensor entity with no last_reset."""
def _compile_statistics(_):
return compile_statistics(hass, now, now + timedelta(seconds=1)).platform_stats
with session_scope(hass=hass) as session:
return compile_statistics(
hass, session, now, now + timedelta(seconds=1)
).platform_stats
energy_attributes = {
ATTR_UNIT_OF_MEASUREMENT: UnitOfEnergy.KILO_WATT_HOUR,

View File

@ -22,7 +22,7 @@ from homeassistant.components.recorder.statistics import (
async_import_statistics,
get_last_short_term_statistics,
get_last_statistics,
get_latest_short_term_statistics,
get_latest_short_term_statistics_with_session,
get_metadata,
get_short_term_statistics_run_cache,
list_statistic_ids,
@ -71,9 +71,13 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant])
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
# Should not fail if there is nothing there yet
stats = get_latest_short_term_statistics(
hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {}
for kwargs in ({}, {"statistic_ids": ["sensor.test1"]}):
@ -172,28 +176,38 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant])
)
assert stats == {"sensor.test1": [expected_2]}
stats = get_latest_short_term_statistics(
hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {"sensor.test1": [expected_2]}
# Now wipe the latest_short_term_statistics_ids table and test again
# to make sure we can rebuild the missing data
run_cache = get_short_term_statistics_run_cache(instance.hass)
run_cache._latest_id_by_metadata_id = {}
stats = get_latest_short_term_statistics(
hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {"sensor.test1": [expected_2]}
metadata = get_metadata(hass, statistic_ids={"sensor.test1"})
stats = get_latest_short_term_statistics(
hass,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
metadata=metadata,
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
metadata=metadata,
)
assert stats == {"sensor.test1": [expected_2]}
stats = get_last_short_term_statistics(
@ -225,10 +239,14 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant])
instance.get_session().query(StatisticsShortTerm).delete()
# Should not fail there is nothing in the table
stats = get_latest_short_term_statistics(
hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
assert stats == {}
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {}
# Delete again, and manually wipe the cache since we deleted all the data
instance.get_session().query(StatisticsShortTerm).delete()
@ -236,9 +254,13 @@ def test_compile_hourly_statistics(hass_recorder: Callable[..., HomeAssistant])
run_cache._latest_id_by_metadata_id = {}
# And test again to make sure there is no data
stats = get_latest_short_term_statistics(
hass, {"sensor.test1"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test1"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
assert stats == {}
@ -259,7 +281,7 @@ def mock_sensor_statistics():
"stat": {"start": start},
}
def get_fake_stats(_hass, start, _end):
def get_fake_stats(_hass, session, start, _end):
return statistics.PlatformCompiledStatistics(
[
sensor_stats("sensor.test1", start),

View File

@ -14,11 +14,12 @@ from homeassistant.components.recorder.db_schema import Statistics, StatisticsSh
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
get_latest_short_term_statistics,
get_latest_short_term_statistics_with_session,
get_metadata,
get_short_term_statistics_run_cache,
list_statistic_ids,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.recorder.websocket_api import UNIT_SCHEMA
from homeassistant.components.sensor import UNIT_CONVERTERS
from homeassistant.core import HomeAssistant
@ -636,9 +637,13 @@ async def test_statistic_during_period(
"change": (imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"])
* 1000,
}
stats = get_latest_short_term_statistics(
hass, {"sensor.test"}, {"last_reset", "max", "mean", "min", "state", "sum"}
)
with session_scope(hass=hass, read_only=True) as session:
stats = get_latest_short_term_statistics_with_session(
hass,
session,
{"sensor.test"},
{"last_reset", "max", "mean", "min", "state", "sum"},
)
start = imported_stats_5min[-1]["start"].timestamp()
end = start + (5 * 60)
assert stats == {

View File

@ -0,0 +1,124 @@
"""The tests for sensor recorder platform can catch up."""
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.recorder.history import get_significant_states
from homeassistant.components.recorder.statistics import (
get_latest_short_term_statistics_with_session,
statistics_during_period,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.helpers import recorder as recorder_helper
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from tests.common import get_test_home_assistant
from tests.components.recorder.common import do_adhoc_statistics, wait_recording_done
POWER_SENSOR_ATTRIBUTES = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
}
@pytest.fixture(autouse=True)
def disable_db_issue_creation():
"""Disable the creation of the database issue."""
with patch(
"homeassistant.components.recorder.util._async_create_mariadb_range_index_regression_issue"
):
yield
@pytest.mark.timeout(25)
def test_compile_missing_statistics(
freezer: FrozenDateTimeFactory, recorder_db_url: str, tmp_path: Path
) -> None:
"""Test compile missing statistics."""
if recorder_db_url == "sqlite://":
# On-disk database because we need to stop and start hass
# and have it persist.
recorder_db_url = "sqlite:///" + str(tmp_path / "pytest.db")
config = {
"db_url": recorder_db_url,
}
three_days_ago = datetime(2021, 1, 1, 0, 0, 0, tzinfo=dt_util.UTC)
start_time = three_days_ago + timedelta(days=3)
freezer.move_to(three_days_ago)
hass: HomeAssistant = get_test_home_assistant()
hass.state = CoreState.not_running
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "sensor", {})
setup_component(hass, "recorder", {"recorder": config})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
hass.states.set("sensor.test1", "0", POWER_SENSOR_ATTRIBUTES)
wait_recording_done(hass)
two_days_ago = three_days_ago + timedelta(days=1)
freezer.move_to(two_days_ago)
do_adhoc_statistics(hass, start=two_days_ago)
wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
latest = get_latest_short_term_statistics_with_session(
hass, session, {"sensor.test1"}, {"state", "sum"}
)
latest_stat = latest["sensor.test1"][0]
assert latest_stat["start"] == 1609545600.0
assert latest_stat["end"] == 1609545600.0 + 300
count = 1
past_time = two_days_ago
while past_time <= start_time:
freezer.move_to(past_time)
hass.states.set("sensor.test1", str(count), POWER_SENSOR_ATTRIBUTES)
past_time += timedelta(minutes=5)
count += 1
wait_recording_done(hass)
states = get_significant_states(hass, three_days_ago, past_time, ["sensor.test1"])
assert len(states["sensor.test1"]) == 577
hass.stop()
freezer.move_to(start_time)
hass: HomeAssistant = get_test_home_assistant()
hass.state = CoreState.not_running
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "sensor", {})
hass.states.set("sensor.test1", "0", POWER_SENSOR_ATTRIBUTES)
setup_component(hass, "recorder", {"recorder": config})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass, read_only=True) as session:
latest = get_latest_short_term_statistics_with_session(
hass, session, {"sensor.test1"}, {"state", "sum", "max", "mean", "min"}
)
latest_stat = latest["sensor.test1"][0]
assert latest_stat["start"] == 1609718100.0
assert latest_stat["end"] == 1609718100.0 + 300
assert latest_stat["mean"] == 576.0
assert latest_stat["min"] == 575.0
assert latest_stat["max"] == 576.0
stats = statistics_during_period(
hass,
two_days_ago,
start_time,
units={"energy": "kWh"},
statistic_ids={"sensor.test1"},
period="hour",
types={"mean"},
)
# Make sure we have 48 hours of statistics
assert len(stats["sensor.test1"]) == 48
# Make sure the last mean is 570.5
assert stats["sensor.test1"][-1]["mean"] == 570.5
hass.stop()