diff --git a/homeassistant/components/history/__init__.py b/homeassistant/components/history/__init__.py index 365be06fd2d..7241e1fac9a 100644 --- a/homeassistant/components/history/__init__.py +++ b/homeassistant/components/history/__init__.py @@ -22,7 +22,7 @@ import homeassistant.util.dt as dt_util from . import websocket_api from .const import DOMAIN -from .helpers import entities_may_have_state_changes_after, has_recorder_run_after +from .helpers import entities_may_have_state_changes_after, has_states_before CONF_ORDER = "use_include_order" @@ -107,7 +107,10 @@ class HistoryPeriodView(HomeAssistantView): no_attributes = "no_attributes" in request.query if ( - (end_time and not has_recorder_run_after(hass, end_time)) + # has_states_before will return True if there are states older than + # end_time. If it's false, we know there are no states in the + # database up until end_time. + (end_time and not has_states_before(hass, end_time)) or not include_start_time_state and entity_ids and not entities_may_have_state_changes_after( diff --git a/homeassistant/components/history/helpers.py b/homeassistant/components/history/helpers.py index bd477e7e4ed..2010b7373ff 100644 --- a/homeassistant/components/history/helpers.py +++ b/homeassistant/components/history/helpers.py @@ -6,7 +6,6 @@ from collections.abc import Iterable from datetime import datetime as dt from homeassistant.components.recorder import get_instance -from homeassistant.components.recorder.models import process_timestamp from homeassistant.core import HomeAssistant @@ -26,8 +25,10 @@ def entities_may_have_state_changes_after( return False -def has_recorder_run_after(hass: HomeAssistant, run_time: dt) -> bool: - """Check if the recorder has any runs after a specific time.""" - return run_time >= process_timestamp( - get_instance(hass).recorder_runs_manager.first.start - ) +def has_states_before(hass: HomeAssistant, run_time: dt) -> bool: + """Check if the recorder has states as old or older than run_time. + + Returns True if there may be such states. + """ + oldest_ts = get_instance(hass).states_manager.oldest_ts + return oldest_ts is not None and run_time.timestamp() >= oldest_ts diff --git a/homeassistant/components/history/websocket_api.py b/homeassistant/components/history/websocket_api.py index c85d975c3c9..35f8ed5f1ac 100644 --- a/homeassistant/components/history/websocket_api.py +++ b/homeassistant/components/history/websocket_api.py @@ -39,7 +39,7 @@ from homeassistant.util.async_ import create_eager_task import homeassistant.util.dt as dt_util from .const import EVENT_COALESCE_TIME, MAX_PENDING_HISTORY_STATES -from .helpers import entities_may_have_state_changes_after, has_recorder_run_after +from .helpers import entities_may_have_state_changes_after, has_states_before _LOGGER = logging.getLogger(__name__) @@ -142,7 +142,10 @@ async def ws_get_history_during_period( no_attributes = msg["no_attributes"] if ( - (end_time and not has_recorder_run_after(hass, end_time)) + # has_states_before will return True if there are states older than + # end_time. If it's false, we know there are no states in the + # database up until end_time. + (end_time and not has_states_before(hass, end_time)) or not include_start_time_state and entity_ids and not entities_may_have_state_changes_after( diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 6ba64d4a571..8c2e1c9e006 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -1424,6 +1424,7 @@ class Recorder(threading.Thread): with session_scope(session=self.get_session()) as session: end_incomplete_runs(session, self.recorder_runs_manager.recording_start) self.recorder_runs_manager.start(session) + self.states_manager.load_from_db(session) self._open_event_session() diff --git a/homeassistant/components/recorder/history/legacy.py b/homeassistant/components/recorder/history/legacy.py index b59fc43c3d0..3a0fe79455b 100644 --- a/homeassistant/components/recorder/history/legacy.py +++ b/homeassistant/components/recorder/history/legacy.py @@ -22,9 +22,9 @@ from homeassistant.core import HomeAssistant, State, split_entity_id from homeassistant.helpers.recorder import get_instance import homeassistant.util.dt as dt_util -from ..db_schema import RecorderRuns, StateAttributes, States +from ..db_schema import StateAttributes, States from ..filters import Filters -from ..models import process_timestamp, process_timestamp_to_utc_isoformat +from ..models import process_timestamp_to_utc_isoformat from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state from ..util import execute_stmt_lambda_element, session_scope from .const import ( @@ -436,7 +436,7 @@ def get_last_state_changes( def _get_states_for_entities_stmt( - run_start: datetime, + run_start_ts: float, utc_point_in_time: datetime, entity_ids: list[str], no_attributes: bool, @@ -447,7 +447,6 @@ def _get_states_for_entities_stmt( ) # We got an include-list of entities, accelerate the query by filtering already # in the inner query. - run_start_ts = process_timestamp(run_start).timestamp() utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time) stmt += lambda q: q.join( ( @@ -483,7 +482,7 @@ def _get_rows_with_session( session: Session, utc_point_in_time: datetime, entity_ids: list[str], - run: RecorderRuns | None = None, + *, no_attributes: bool = False, ) -> Iterable[Row]: """Return the states at a specific point in time.""" @@ -495,17 +494,16 @@ def _get_rows_with_session( ), ) - if run is None: - run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time) + oldest_ts = get_instance(hass).states_manager.oldest_ts - if run is None or process_timestamp(run.start) > utc_point_in_time: - # History did not run before utc_point_in_time + if oldest_ts is None or oldest_ts > utc_point_in_time.timestamp(): + # We don't have any states for the requested time return [] # We have more than one entity to look at so we need to do a query on states # since the last recorder run started. stmt = _get_states_for_entities_stmt( - run.start, utc_point_in_time, entity_ids, no_attributes + oldest_ts, utc_point_in_time, entity_ids, no_attributes ) return execute_stmt_lambda_element(session, stmt) diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py index b44bec0d0ee..902f1b5dc24 100644 --- a/homeassistant/components/recorder/history/modern.py +++ b/homeassistant/components/recorder/history/modern.py @@ -34,7 +34,6 @@ from ..models import ( LazyState, datetime_to_timestamp_or_none, extract_metadata_ids, - process_timestamp, row_to_compressed_state, ) from ..util import execute_stmt_lambda_element, session_scope @@ -246,9 +245,9 @@ def get_significant_states_with_session( if metadata_id is not None and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS ] - run_start_ts: float | None = None + oldest_ts: float | None = None if include_start_time_state and not ( - run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time) + oldest_ts := _get_oldest_possible_ts(hass, start_time) ): include_start_time_state = False start_time_ts = dt_util.utc_to_timestamp(start_time) @@ -264,7 +263,7 @@ def get_significant_states_with_session( significant_changes_only, no_attributes, include_start_time_state, - run_start_ts, + oldest_ts, ), track_on=[ bool(single_metadata_id), @@ -411,9 +410,9 @@ def state_changes_during_period( entity_id_to_metadata_id: dict[str, int | None] = { entity_id: single_metadata_id } - run_start_ts: float | None = None + oldest_ts: float | None = None if include_start_time_state and not ( - run_start_ts := _get_run_start_ts_for_utc_point_in_time(hass, start_time) + oldest_ts := _get_oldest_possible_ts(hass, start_time) ): include_start_time_state = False start_time_ts = dt_util.utc_to_timestamp(start_time) @@ -426,7 +425,7 @@ def state_changes_during_period( no_attributes, limit, include_start_time_state, - run_start_ts, + oldest_ts, has_last_reported, ), track_on=[ @@ -600,17 +599,17 @@ def _get_start_time_state_for_entities_stmt( ) -def _get_run_start_ts_for_utc_point_in_time( +def _get_oldest_possible_ts( hass: HomeAssistant, utc_point_in_time: datetime ) -> float | None: - """Return the start time of a run.""" - run = get_instance(hass).recorder_runs_manager.get(utc_point_in_time) - if ( - run is not None - and (run_start := process_timestamp(run.start)) < utc_point_in_time - ): - return run_start.timestamp() - # History did not run before utc_point_in_time but we still + """Return the oldest possible timestamp. + + Returns None if there are no states as old as utc_point_in_time. + """ + + oldest_ts = get_instance(hass).states_manager.oldest_ts + if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp(): + return oldest_ts return None diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 329f48e5455..28a5a2ed32d 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -123,6 +123,9 @@ def purge_old_data( _purge_old_entity_ids(instance, session) _purge_old_recorder_runs(instance, session, purge_before) + with session_scope(session=instance.get_session(), read_only=True) as session: + instance.recorder_runs_manager.load_from_db(session) + instance.states_manager.load_from_db(session) if repack: repack_database(instance) return True diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 2e4b588a0b0..8ca7bef2691 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -637,6 +637,15 @@ def find_states_to_purge( ) +def find_oldest_state() -> StatementLambdaElement: + """Find the last_updated_ts of the oldest state.""" + return lambda_stmt( + lambda: select(States.last_updated_ts).where( + States.state_id.in_(select(func.min(States.state_id))) + ) + ) + + def find_short_term_statistics_to_purge( purge_before: datetime, max_bind_vars: int ) -> StatementLambdaElement: diff --git a/homeassistant/components/recorder/table_managers/states.py b/homeassistant/components/recorder/table_managers/states.py index d5cef759c54..fafcfa0ea61 100644 --- a/homeassistant/components/recorder/table_managers/states.py +++ b/homeassistant/components/recorder/table_managers/states.py @@ -2,7 +2,15 @@ from __future__ import annotations +from collections.abc import Sequence +from typing import Any, cast + +from sqlalchemy.engine.row import Row +from sqlalchemy.orm.session import Session + from ..db_schema import States +from ..queries import find_oldest_state +from ..util import execute_stmt_lambda_element class StatesManager: @@ -13,6 +21,12 @@ class StatesManager: self._pending: dict[str, States] = {} self._last_committed_id: dict[str, int] = {} self._last_reported: dict[int, float] = {} + self._oldest_ts: float | None = None + + @property + def oldest_ts(self) -> float | None: + """Return the oldest timestamp.""" + return self._oldest_ts def pop_pending(self, entity_id: str) -> States | None: """Pop a pending state. @@ -44,6 +58,8 @@ class StatesManager: recorder thread. """ self._pending[entity_id] = state + if self._oldest_ts is None: + self._oldest_ts = state.last_updated_ts def update_pending_last_reported( self, state_id: int, last_reported_timestamp: float @@ -74,6 +90,22 @@ class StatesManager: """ self._last_committed_id.clear() self._pending.clear() + self._oldest_ts = None + + def load_from_db(self, session: Session) -> None: + """Update the cache. + + Must run in the recorder thread. + """ + result = cast( + Sequence[Row[Any]], + execute_stmt_lambda_element(session, find_oldest_state()), + ) + if not result: + ts = None + else: + ts = result[0].last_updated_ts + self._oldest_ts = ts def evict_purged_state_ids(self, purged_state_ids: set[int]) -> None: """Evict purged states from the committed states. diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index 783f0a80b8e..fa10c12aa68 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -120,8 +120,6 @@ class PurgeTask(RecorderTask): if purge.purge_old_data( instance, self.purge_before, self.repack, self.apply_filter ): - with instance.get_session() as session: - instance.recorder_runs_manager.load_from_db(session) # We always need to do the db cleanups after a purge # is finished to ensure the WAL checkpoint and other # tasks happen after a vacuum. diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index ca160e5201b..f721a260c14 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -112,6 +112,9 @@ async def test_purge_big_database(hass: HomeAssistant, recorder_mock: Recorder) async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> None: """Test deleting old states.""" + assert recorder_mock.states_manager.oldest_ts is None + oldest_ts = recorder_mock.states_manager.oldest_ts + await _add_test_states(hass) # make sure we start with 6 states @@ -127,6 +130,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> events = session.query(Events).filter(Events.event_type == "state_changed") assert events.count() == 0 + assert recorder_mock.states_manager.oldest_ts != oldest_ts + assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts + oldest_ts = recorder_mock.states_manager.oldest_ts + assert "test.recorder2" in recorder_mock.states_manager._last_committed_id purge_before = dt_util.utcnow() - timedelta(days=4) @@ -140,6 +147,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> repack=False, ) assert not finished + # states_manager.oldest_ts is not updated until after the purge is complete + assert recorder_mock.states_manager.oldest_ts == oldest_ts with session_scope(hass=hass) as session: states = session.query(States) @@ -162,6 +171,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> finished = purge_old_data(recorder_mock, purge_before, repack=False) assert finished + # states_manager.oldest_ts should now be updated + assert recorder_mock.states_manager.oldest_ts != oldest_ts with session_scope(hass=hass) as session: states = session.query(States) @@ -169,6 +180,10 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> assert states.count() == 2 assert state_attributes.count() == 1 + assert recorder_mock.states_manager.oldest_ts != oldest_ts + assert recorder_mock.states_manager.oldest_ts == states[0].last_updated_ts + oldest_ts = recorder_mock.states_manager.oldest_ts + assert "test.recorder2" in recorder_mock.states_manager._last_committed_id # run purge_old_data again @@ -181,6 +196,8 @@ async def test_purge_old_states(hass: HomeAssistant, recorder_mock: Recorder) -> repack=False, ) assert not finished + # states_manager.oldest_ts is not updated until after the purge is complete + assert recorder_mock.states_manager.oldest_ts == oldest_ts with session_scope(hass=hass) as session: assert states.count() == 0