Improve recorder history queries (#131702)

* Improve recorder history queries

* Remove some comments

* Update StatesManager._oldest_ts when adding pending state

* Update after review

* Improve tests

* Improve post-purge logic

* Avoid calling dt_util.utc_to_timestamp in new code

---------

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Erik Montnemery 2024-11-27 21:12:42 +01:00 committed by GitHub
parent e04b6f0cd8
commit 381d5453b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 102 additions and 38 deletions

View File

@ -22,7 +22,7 @@ import homeassistant.util.dt as dt_util
from . import websocket_api
from .const import DOMAIN
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before
CONF_ORDER = "use_include_order"
@ -107,7 +107,10 @@ class HistoryPeriodView(HomeAssistantView):
no_attributes = "no_attributes" in request.query
if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(

View File

@ -6,7 +6,6 @@ from collections.abc import Iterable
from datetime import datetime as dt
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.core import HomeAssistant
@ -26,8 +25,10 @@ def entities_may_have_state_changes_after(
return False
def has_recorder_run_after(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has any runs after a specific time."""
return run_time >= process_timestamp(
get_instance(hass).recorder_runs_manager.first.start
)
def has_states_before(hass: HomeAssistant, run_time: dt) -> bool:
"""Check if the recorder has states as old or older than run_time.
Returns True if there may be such states.
"""
oldest_ts = get_instance(hass).states_manager.oldest_ts
return oldest_ts is not None and run_time.timestamp() >= oldest_ts

View File

@ -39,7 +39,7 @@ from homeassistant.util.async_ import create_eager_task
import homeassistant.util.dt as dt_util
from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES
from .helpers import entities_may_have_state_changes_after, has_recorder_run_after
from .helpers import entities_may_have_state_changes_after, has_states_before
_LOGGER = logging.getLogger(__name__)
@ -142,7 +142,10 @@ async def ws_get_history_during_period(
no_attributes = msg["no_attributes"]
if (
(end_time and not has_recorder_run_after(hass, end_time))
# has_states_before will return True if there are states older than
# end_time. If it's false, we know there are no states in the
# database up until end_time.
(end_time and not has_states_before(hass, end_time))
or not include_start_time_state
and entity_ids
and not entities_may_have_state_changes_after(

View File

@ -1424,6 +1424,7 @@ class Recorder(threading.Thread):
with session_scope(session=self.get_session()) as session:
end_incomplete_runs(session, self.recorder_runs_manager.recording_start)
self.recorder_runs_manager.start(session)
self.states_manager.load_from_db(session)
self._open_event_session()

View File

@ -22,9 +22,9 @@ from homeassistant.core import HomeAssistant, State, split_entity_id
from homeassistant.helpers.recorder import get_instance
import homeassistant.util.dt as dt_util
from ..db_schema import RecorderRuns, StateAttributes, States
from ..db_schema import StateAttributes, States
from ..filters import Filters
from ..models import process_timestamp, process_timestamp_to_utc_isoformat
from ..models import process_timestamp_to_utc_isoformat
from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state
from ..util import execute_stmt_lambda_element, session_scope
from .const import (
@ -436,7 +436,7 @@ def get_last_state_changes(
def _get_states_for_entities_stmt(
run_start: datetime,
run_start_ts: float,
utc_point_in_time: datetime,
entity_ids: list[str],
no_attributes: bool,
@ -447,7 +447,6 @@ def _get_states_for_entities_stmt(
)
# We got an include-list of entities, accelerate the query by filtering already
# in the inner query.
run_start_ts = process_timestamp(run_start).timestamp()
utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
stmt += lambda q: q.join(
(
@ -483,7 +482,7 @@ def _get_rows_with_session(
session: Session,
utc_point_in_time: datetime,
entity_ids: list[str],
run: RecorderRuns | None = None,
*,
no_attributes: bool = False,
) -> Iterable[Row]:
"""Return the states at a specific point in time."""
@ -495,17 +494,16 @@ def _get_rows_with_session(
),
)
if run is None:
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
oldest_ts = get_instance(hass).states_manager.oldest_ts
if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time
if oldest_ts is None or oldest_ts > utc_point_in_time.timestamp():
# We don't have any states for the requested time
return []
# We have more than one entity to look at so we need to do a query on states
# since the last recorder run started.
stmt = _get_states_for_entities_stmt(
run.start, utc_point_in_time, entity_ids, no_attributes
oldest_ts, utc_point_in_time, entity_ids, no_attributes
)
return execute_stmt_lambda_element(session, stmt)

View File

@ -34,7 +34,6 @@ from ..models import (
LazyState,
datetime_to_timestamp_or_none,
extract_metadata_ids,
process_timestamp,
row_to_compressed_state,
)
from ..util import execute_stmt_lambda_element, session_scope
@ -246,9 +245,9 @@ def get_significant_states_with_session(
if metadata_id is not None
and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS
]
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
@ -264,7 +263,7 @@ def get_significant_states_with_session(
significant_changes_only,
no_attributes,
include_start_time_state,
run_start_ts,
oldest_ts,
),
track_on=[
bool(single_metadata_id),
@ -411,9 +410,9 @@ def state_changes_during_period(
entity_id_to_metadata_id: dict[str, int | None] = {
entity_id: single_metadata_id
}
run_start_ts: float | None = None
oldest_ts: float | None = None
if include_start_time_state and not (
run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time)
oldest_ts := _get_oldest_possible_ts(hass, start_time)
):
include_start_time_state = False
start_time_ts = dt_util.utc_to_timestamp(start_time)
@ -426,7 +425,7 @@ def state_changes_during_period(
no_attributes,
limit,
include_start_time_state,
run_start_ts,
oldest_ts,
has_last_reported,
),
track_on=[
@ -600,17 +599,17 @@ def _get_start_time_state_for_entities_stmt(
)
def _get_run_start_ts_for_utc_point_in_time(
def _get_oldest_possible_ts(
hass: HomeAssistant, utc_point_in_time: datetime
) -> float | None:
"""Return the start time of a run."""
run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if (
run is not None
and (run_start := process_timestamp(run.start)) < utc_point_in_time
):
return run_start.timestamp()
# History did not run before utc_point_in_time but we still
"""Return the oldest possible timestamp.
Returns None if there are no states as old as utc_point_in_time.
"""
oldest_ts = get_instance(hass).states_manager.oldest_ts
if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp():
return oldest_ts
return None

View File

@ -123,6 +123,9 @@ def purge_old_data(
_purge_old_entity_ids(instance, session)
_purge_old_recorder_runs(instance, session, purge_before)
with session_scope(session=instance.get_session(), read_only=True) as session:
instance.recorder_runs_manager.load_from_db(session)
instance.states_manager.load_from_db(session)
if repack:
repack_database(instance)
return True

View File

@ -637,6 +637,15 @@ def find_states_to_purge(
)
def find_oldest_state() -> StatementLambdaElement:
"""Find the last_updated_ts of the oldest state."""
return lambda_stmt(
lambda: select(States.last_updated_ts).where(
States.state_id.in_(select(func.min(States.state_id)))
)
)
def find_short_term_statistics_to_purge(
purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement:

View File

@ -2,7 +2,15 @@
from __future__ import annotations
from collections.abc import Sequence
from typing import Any, cast
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
from ..db_schema import States
from ..queries import find_oldest_state
from ..util import execute_stmt_lambda_element
class StatesManager:
@ -13,6 +21,12 @@ class StatesManager:
self._pending: dict[str, States] = {}
self._last_committed_id: dict[str, int] = {}
self._last_reported: dict[int, float] = {}
self._oldest_ts: float | None = None
@property
def oldest_ts(self) -> float | None:
"""Return the oldest timestamp."""
return self._oldest_ts
def pop_pending(self, entity_id: str) -> States | None:
"""Pop a pending state.
@ -44,6 +58,8 @@ class StatesManager:
recorder thread.
"""
self._pending[entity_id] = state
if self._oldest_ts is None:
self._oldest_ts = state.last_updated_ts
def update_pending_last_reported(
self, state_id: int, last_reported_timestamp: float
@ -74,6 +90,22 @@ class StatesManager:
"""
self._last_committed_id.clear()
self._pending.clear()
self._oldest_ts = None
def load_from_db(self, session: Session) -> None:
"""Update the cache.
Must run in the recorder thread.
"""
result = cast(
Sequence[Row[Any]],
execute_stmt_lambda_element(session, find_oldest_state()),
)
if not result:
ts = None
else:
ts = result[0].last_updated_ts
self._oldest_ts = ts
def evict_purged_state_ids(self, purged_state_ids: set[int]) -> None:
"""Evict purged states from the committed states.

View File

@ -120,8 +120,6 @@ class PurgeTask(RecorderTask):
if purge.purge_old_data(
instance, self.purge_before, self.repack, self.apply_filter
):
with instance.get_session() as session:
instance.recorder_runs_manager.load_from_db(session)
# We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum.

View File

@ -112,6 +112,9 @@ async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder)
async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test deleting old states."""
assert recorder_mock.states_manager.oldest_ts is None
oldest_ts = recorder_mock.states_manager.oldest_ts
await _add_test_states(hass)
# make sure we start with 6 states
@ -127,6 +130,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 0
assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
purge_before = dt_util.utcnow() - timedelta(days=4)
@ -140,6 +147,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts
with session_scope(hass=hass) as session:
states = session.query(States)
@ -162,6 +171,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
finished = purge_old_data(recorder_mock, purge_before, repack=False)
assert finished
# states_manager.oldest_ts should now be updated
assert recorder_mock.states_manager.oldest_ts != oldest_ts
with session_scope(hass=hass) as session:
states = session.query(States)
@ -169,6 +180,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
assert states.count() == 2
assert state_attributes.count() == 1
assert recorder_mock.states_manager.oldest_ts != oldest_ts
assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts
oldest_ts = recorder_mock.states_manager.oldest_ts
assert "test.recorder2" in recorder_mock.states_manager._last_committed_id
# run purge_old_data again
@ -181,6 +196,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) ->
repack=False,
)
assert not finished
# states_manager.oldest_ts is not updated until after the purge is complete
assert recorder_mock.states_manager.oldest_ts == oldest_ts
with session_scope(hass=hass) as session:
assert states.count() == 0