Rename recorder run_history to table_managers.recorder_runs_manager (#90070)

This commit is contained in:
J. Nick Koston 2023-03-21 18:38:33 -10:00 committed by GitHub
parent 1439a3d572
commit 96225bb287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 51 additions and 39 deletions

View File

@ -80,9 +80,9 @@ from .queries import (
has_events_context_ids_to_migrate, has_events_context_ids_to_migrate,
has_states_context_ids_to_migrate, has_states_context_ids_to_migrate,
) )
from .run_history import RunHistory
from .table_managers.event_data import EventDataManager from .table_managers.event_data import EventDataManager
from .table_managers.event_types import EventTypeManager from .table_managers.event_types import EventTypeManager
from .table_managers.recorder_runs import RecorderRunsManager
from .table_managers.state_attributes import StateAttributesManager from .table_managers.state_attributes import StateAttributesManager
from .table_managers.states import StatesManager from .table_managers.states import StatesManager
from .table_managers.states_meta import StatesMetaManager from .table_managers.states_meta import StatesMetaManager
@ -198,7 +198,6 @@ class Recorder(threading.Thread):
self.async_recorder_ready = asyncio.Event() self.async_recorder_ready = asyncio.Event()
self._queue_watch = threading.Event() self._queue_watch = threading.Event()
self.engine: Engine | None = None self.engine: Engine | None = None
self.run_history = RunHistory()
# The entity_filter is exposed on the recorder instance so that # The entity_filter is exposed on the recorder instance so that
# it can be used to see if an entity is being recorded and is called # it can be used to see if an entity is being recorded and is called
@ -208,6 +207,8 @@ class Recorder(threading.Thread):
self.schema_version = 0 self.schema_version = 0
self._commits_without_expire = 0 self._commits_without_expire = 0
self.recorder_runs_manager = RecorderRunsManager()
self.states_manager = StatesManager() self.states_manager = StatesManager()
self.event_data_manager = EventDataManager(self) self.event_data_manager = EventDataManager(self)
self.event_type_manager = EventTypeManager(self) self.event_type_manager = EventTypeManager(self)
@ -216,6 +217,7 @@ class Recorder(threading.Thread):
self, exclude_attributes_by_domain self, exclude_attributes_by_domain
) )
self.statistics_meta_manager = StatisticsMetaManager(self) self.statistics_meta_manager = StatisticsMetaManager(self)
self.event_session: Session | None = None self.event_session: Session | None = None
self._get_session: Callable[[], Session] | None = None self._get_session: Callable[[], Session] | None = None
self._completed_first_database_setup: bool | None = None self._completed_first_database_setup: bool | None = None
@ -1117,7 +1119,7 @@ class Recorder(threading.Thread):
finally: finally:
self._close_connection() self._close_connection()
move_away_broken_database(dburl_to_path(self.db_url)) move_away_broken_database(dburl_to_path(self.db_url))
self.run_history.reset() self.recorder_runs_manager.reset()
self._setup_recorder() self._setup_recorder()
self._setup_run() self._setup_run()
@ -1333,8 +1335,8 @@ class Recorder(threading.Thread):
def _setup_run(self) -> None: def _setup_run(self) -> None:
"""Log the start of the current run and schedule any needed jobs.""" """Log the start of the current run and schedule any needed jobs."""
with session_scope(session=self.get_session()) as session: with session_scope(session=self.get_session()) as session:
end_incomplete_runs(session, self.run_history.recording_start) end_incomplete_runs(session, self.recorder_runs_manager.recording_start)
self.run_history.start(session) self.recorder_runs_manager.start(session)
self._open_event_session() self._open_event_session()
@ -1346,15 +1348,15 @@ class Recorder(threading.Thread):
"""End the recorder session.""" """End the recorder session."""
if self.event_session is None: if self.event_session is None:
return return
if self.run_history.active: if self.recorder_runs_manager.active:
self.run_history.end(self.event_session) self.recorder_runs_manager.end(self.event_session)
try: try:
self._commit_event_session_or_retry() self._commit_event_session_or_retry()
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error saving the event session during shutdown: %s", err) _LOGGER.exception("Error saving the event session during shutdown: %s", err)
self.event_session.close() self.event_session.close()
self.run_history.clear() self.recorder_runs_manager.clear()
def _shutdown(self) -> None: def _shutdown(self) -> None:
"""Save end time for current run.""" """Save end time for current run."""

View File

@ -742,7 +742,7 @@ def _get_rows_with_session(
) )
if run is None: if run is None:
run = recorder.get_instance(hass).run_history.get(utc_point_in_time) run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if run is None or process_timestamp(run.start) > utc_point_in_time: if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time # History did not run before utc_point_in_time

View File

@ -577,7 +577,7 @@ def _get_rows_with_session(
) )
if run is None: if run is None:
run = recorder.get_instance(hass).run_history.get(utc_point_in_time) run = recorder.get_instance(hass).recorder_runs_manager.get(utc_point_in_time)
if run is None or process_timestamp(run.start) > utc_point_in_time: if run is None or process_timestamp(run.start) > utc_point_in_time:
# History did not run before utc_point_in_time # History did not run before utc_point_in_time

View File

@ -517,7 +517,9 @@ def _purge_old_recorder_runs(
"""Purge all old recorder runs.""" """Purge all old recorder runs."""
# Recorder runs is small, no need to batch run it # Recorder runs is small, no need to batch run it
deleted_rows = session.execute( deleted_rows = session.execute(
delete_recorder_runs_rows(purge_before, instance.run_history.current.run_id) delete_recorder_runs_rows(
purge_before, instance.recorder_runs_manager.current.run_id
)
) )
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows) _LOGGER.debug("Deleted %s recorder_runs", deleted_rows)

View File

@ -58,7 +58,7 @@ async def system_health_info(hass: HomeAssistant) -> dict[str, Any]:
"""Get info for the info page.""" """Get info for the info page."""
instance = get_instance(hass) instance = get_instance(hass)
run_history = instance.run_history recorder_runs_manager = instance.recorder_runs_manager
database_name = urlparse(instance.db_url).path.lstrip("/") database_name = urlparse(instance.db_url).path.lstrip("/")
db_engine_info = _async_get_db_engine_info(instance) db_engine_info = _async_get_db_engine_info(instance)
db_stats: dict[str, Any] = {} db_stats: dict[str, Any] = {}
@ -68,7 +68,7 @@ async def system_health_info(hass: HomeAssistant) -> dict[str, Any]:
_get_db_stats, instance, database_name _get_db_stats, instance, database_name
) )
db_runs = { db_runs = {
"oldest_recorder_run": run_history.first.start, "oldest_recorder_run": recorder_runs_manager.first.start,
"current_recorder_run": run_history.current.start, "current_recorder_run": recorder_runs_manager.current.start,
} }
return db_runs | db_stats | db_engine_info return db_runs | db_stats | db_engine_info

View File

@ -9,8 +9,8 @@ from sqlalchemy.orm.session import Session
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .db_schema import RecorderRuns from ..db_schema import RecorderRuns
from .models import process_timestamp from ..models import process_timestamp
def _find_recorder_run_for_start_time( def _find_recorder_run_for_start_time(
@ -40,7 +40,7 @@ class _RecorderRunsHistory:
runs_by_timestamp: dict[int, RecorderRuns] runs_by_timestamp: dict[int, RecorderRuns]
class RunHistory: class RecorderRunsManager:
"""Track recorder run history.""" """Track recorder run history."""
def __init__(self) -> None: def __init__(self) -> None:

View File

@ -113,7 +113,7 @@ class PurgeTask(RecorderTask):
instance, self.purge_before, self.repack, self.apply_filter instance, self.purge_before, self.repack, self.apply_filter
): ):
with instance.get_session() as session: with instance.get_session() as session:
instance.run_history.load_from_db(session) instance.recorder_runs_manager.load_from_db(session)
# We always need to do the db cleanups after a purge # We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other # is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum. # tasks happen after a vacuum.

View File

@ -1,4 +1,4 @@
"""Test run history.""" """Test recorder runs table manager."""
from datetime import timedelta from datetime import timedelta
from unittest.mock import patch from unittest.mock import patch
@ -25,29 +25,35 @@ async def test_run_history(recorder_mock: Recorder, hass: HomeAssistant) -> None
session.add(RecorderRuns(start=two_days_ago, created=two_days_ago)) session.add(RecorderRuns(start=two_days_ago, created=two_days_ago))
session.add(RecorderRuns(start=one_day_ago, created=one_day_ago)) session.add(RecorderRuns(start=one_day_ago, created=one_day_ago))
session.commit() session.commit()
instance.run_history.load_from_db(session) instance.recorder_runs_manager.load_from_db(session)
assert ( assert (
process_timestamp( process_timestamp(
instance.run_history.get(three_days_ago + timedelta(microseconds=1)).start instance.recorder_runs_manager.get(
three_days_ago + timedelta(microseconds=1)
).start
) )
== three_days_ago == three_days_ago
) )
assert ( assert (
process_timestamp( process_timestamp(
instance.run_history.get(two_days_ago + timedelta(microseconds=1)).start instance.recorder_runs_manager.get(
two_days_ago + timedelta(microseconds=1)
).start
) )
== two_days_ago == two_days_ago
) )
assert ( assert (
process_timestamp( process_timestamp(
instance.run_history.get(one_day_ago + timedelta(microseconds=1)).start instance.recorder_runs_manager.get(
one_day_ago + timedelta(microseconds=1)
).start
) )
== one_day_ago == one_day_ago
) )
assert ( assert (
process_timestamp(instance.run_history.get(now).start) process_timestamp(instance.recorder_runs_manager.get(now).start)
== instance.run_history.recording_start == instance.recorder_runs_manager.recording_start
) )
@ -64,10 +70,10 @@ async def test_run_history_while_recorder_is_not_yet_started(
# Prevent the run history from starting to ensure # Prevent the run history from starting to ensure
# we can test run_history.current.start returns the expected value # we can test run_history.current.start returns the expected value
with patch( with patch(
"homeassistant.components.recorder.run_history.RunHistory.start", "homeassistant.components.recorder.table_managers.recorder_runs.RecorderRunsManager.start",
): ):
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
run_history = instance.run_history run_history = instance.recorder_runs_manager
assert run_history.current.start == run_history.recording_start assert run_history.current.start == run_history.recording_start
def _start_run_history(): def _start_run_history():

View File

@ -1551,7 +1551,7 @@ async def test_database_corruption_while_running(
await hass.async_block_till_done() await hass.async_block_till_done()
caplog.clear() caplog.clear()
original_start_time = get_instance(hass).run_history.recording_start original_start_time = get_instance(hass).recorder_runs_manager.recording_start
hass.states.async_set("test.lost", "on", {}) hass.states.async_set("test.lost", "on", {})
@ -1599,7 +1599,7 @@ async def test_database_corruption_while_running(
assert state.entity_id == "test.two" assert state.entity_id == "test.two"
assert state.state == "on" assert state.state == "on"
new_start_time = get_instance(hass).run_history.recording_start new_start_time = get_instance(hass).recorder_runs_manager.recording_start
assert original_start_time < new_start_time assert original_start_time < new_start_time
hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP) hass.bus.async_fire(EVENT_HOMEASSISTANT_STOP)

View File

@ -348,7 +348,7 @@ async def test_schema_migrate(
def _mock_setup_run(self): def _mock_setup_run(self):
self.run_info = RecorderRuns( self.run_info = RecorderRuns(
start=self.run_history.recording_start, created=dt_util.utcnow() start=self.recorder_runs_manager.recording_start, created=dt_util.utcnow()
) )
def _instrument_migrate_schema(*args): def _instrument_migrate_schema(*args):

View File

@ -27,8 +27,8 @@ async def test_recorder_system_health(
info = await get_system_health_info(hass, "recorder") info = await get_system_health_info(hass, "recorder")
instance = get_instance(hass) instance = get_instance(hass)
assert info == { assert info == {
"current_recorder_run": instance.run_history.current.start, "current_recorder_run": instance.recorder_runs_manager.current.start,
"oldest_recorder_run": instance.run_history.first.start, "oldest_recorder_run": instance.recorder_runs_manager.first.start,
"estimated_db_size": ANY, "estimated_db_size": ANY,
"database_engine": SupportedDialect.SQLITE.value, "database_engine": SupportedDialect.SQLITE.value,
"database_version": ANY, "database_version": ANY,
@ -53,8 +53,8 @@ async def test_recorder_system_health_alternate_dbms(
info = await get_system_health_info(hass, "recorder") info = await get_system_health_info(hass, "recorder")
instance = get_instance(hass) instance = get_instance(hass)
assert info == { assert info == {
"current_recorder_run": instance.run_history.current.start, "current_recorder_run": instance.recorder_runs_manager.current.start,
"oldest_recorder_run": instance.run_history.first.start, "oldest_recorder_run": instance.recorder_runs_manager.first.start,
"estimated_db_size": "1.00 MiB", "estimated_db_size": "1.00 MiB",
"database_engine": dialect_name.value, "database_engine": dialect_name.value,
"database_version": ANY, "database_version": ANY,
@ -84,8 +84,8 @@ async def test_recorder_system_health_db_url_missing_host(
): ):
info = await get_system_health_info(hass, "recorder") info = await get_system_health_info(hass, "recorder")
assert info == { assert info == {
"current_recorder_run": instance.run_history.current.start, "current_recorder_run": instance.recorder_runs_manager.current.start,
"oldest_recorder_run": instance.run_history.first.start, "oldest_recorder_run": instance.recorder_runs_manager.first.start,
"estimated_db_size": "1.00 MiB", "estimated_db_size": "1.00 MiB",
"database_engine": dialect_name.value, "database_engine": dialect_name.value,
"database_version": ANY, "database_version": ANY,
@ -102,14 +102,16 @@ async def test_recorder_system_health_crashed_recorder_runs_table(
# This test is specific for SQLite # This test is specific for SQLite
return return
with patch("homeassistant.components.recorder.run_history.RunHistory.load_from_db"): with patch(
"homeassistant.components.recorder.table_managers.recorder_runs.RecorderRunsManager.load_from_db"
):
assert await async_setup_component(hass, "system_health", {}) assert await async_setup_component(hass, "system_health", {})
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
info = await get_system_health_info(hass, "recorder") info = await get_system_health_info(hass, "recorder")
assert info == { assert info == {
"current_recorder_run": instance.run_history.current.start, "current_recorder_run": instance.recorder_runs_manager.current.start,
"oldest_recorder_run": instance.run_history.current.start, "oldest_recorder_run": instance.recorder_runs_manager.current.start,
"estimated_db_size": ANY, "estimated_db_size": ANY,
"database_engine": SupportedDialect.SQLITE.value, "database_engine": SupportedDialect.SQLITE.value,
"database_version": ANY, "database_version": ANY,