From 896bf986ebfc5b58344f4e9af02b1b6386caed90 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 8 May 2022 14:47:12 -0500 Subject: [PATCH] Speed up nightly database purges with lambda_stmt (#71537) --- homeassistant/components/recorder/purge.py | 123 +++++--------- homeassistant/components/recorder/queries.py | 165 ++++++++++++++++++- 2 files changed, 205 insertions(+), 83 deletions(-) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 2c6211ca2cd..f94f3d3d641 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -7,23 +7,32 @@ from itertools import zip_longest import logging from typing import TYPE_CHECKING -from sqlalchemy import func from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct from homeassistant.const import EVENT_STATE_CHANGED from .const import MAX_ROWS_TO_PURGE, SupportedDialect -from .models import ( - EventData, - Events, - RecorderRuns, - StateAttributes, - States, - StatisticsRuns, - StatisticsShortTerm, +from .models import Events, StateAttributes, States +from .queries import ( + attributes_ids_exist_in_states, + attributes_ids_exist_in_states_sqlite, + data_ids_exist_in_events, + data_ids_exist_in_events_sqlite, + delete_event_data_rows, + delete_event_rows, + delete_recorder_runs_rows, + delete_states_attributes_rows, + delete_states_rows, + delete_statistics_runs_rows, + delete_statistics_short_term_rows, + disconnect_states_rows, + find_events_to_purge, + find_latest_statistics_runs_run_id, + find_short_term_statistics_to_purge, + find_states_to_purge, + find_statistics_runs_to_purge, ) -from .queries import attributes_ids_exist_in_states, data_ids_exist_in_events from .repack import repack_database from .util import retryable_database_job, session_scope @@ -101,19 +110,9 @@ def _select_event_state_attributes_ids_data_ids_to_purge( session: Session, purge_before: datetime ) -> tuple[set[int], set[int], set[int], set[int]]: """Return a list of event, state, and attribute ids to purge.""" - events = ( - session.query(Events.event_id, Events.data_id) - .filter(Events.time_fired < purge_before) - .limit(MAX_ROWS_TO_PURGE) - .all() - ) + events = session.execute(find_events_to_purge(purge_before)).all() _LOGGER.debug("Selected %s event ids to remove", len(events)) - states = ( - session.query(States.state_id, States.attributes_id) - .filter(States.last_updated < purge_before) - .limit(MAX_ROWS_TO_PURGE) - .all() - ) + states = session.execute(find_states_to_purge(purge_before)).all() _LOGGER.debug("Selected %s state ids to remove", len(states)) event_ids = set() state_ids = set() @@ -152,9 +151,9 @@ def _select_unused_attributes_ids( # seen_ids = { state[0] - for state in session.query(distinct(States.attributes_id)) - .filter(States.attributes_id.in_(attributes_ids)) - .all() + for state in session.execute( + attributes_ids_exist_in_states_sqlite(attributes_ids) + ).all() } else: # @@ -210,9 +209,9 @@ def _select_unused_event_data_ids( if using_sqlite: seen_ids = { state[0] - for state in session.query(distinct(Events.data_id)) - .filter(Events.data_id.in_(data_ids)) - .all() + for state in session.execute( + data_ids_exist_in_events_sqlite(data_ids) + ).all() } else: seen_ids = set() @@ -234,16 +233,11 @@ def _select_statistics_runs_to_purge( session: Session, purge_before: datetime ) -> list[int]: """Return a list of statistic runs to purge, but take care to keep the newest run.""" - statistic_runs = ( - session.query(StatisticsRuns.run_id) - .filter(StatisticsRuns.start < purge_before) - .limit(MAX_ROWS_TO_PURGE) - .all() - ) + statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all() statistic_runs_list = [run.run_id for run in statistic_runs] # Exclude the newest statistics run if ( - last_run := session.query(func.max(StatisticsRuns.run_id)).scalar() + last_run := session.execute(find_latest_statistics_runs_run_id()).scalar() ) and last_run in statistic_runs_list: statistic_runs_list.remove(last_run) @@ -255,12 +249,9 @@ def _select_short_term_statistics_to_purge( session: Session, purge_before: datetime ) -> list[int]: """Return a list of short term statistics to purge.""" - statistics = ( - session.query(StatisticsShortTerm.id) - .filter(StatisticsShortTerm.start < purge_before) - .limit(MAX_ROWS_TO_PURGE) - .all() - ) + statistics = session.execute( + find_short_term_statistics_to_purge(purge_before) + ).all() _LOGGER.debug("Selected %s short term statistics to remove", len(statistics)) return [statistic.id for statistic in statistics] @@ -272,18 +263,10 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) # the delete does not fail due to a foreign key constraint # since some databases (MSSQL) cannot do the ON DELETE SET NULL # for us. - disconnected_rows = ( - session.query(States) - .filter(States.old_state_id.in_(state_ids)) - .update({"old_state_id": None}, synchronize_session=False) - ) + disconnected_rows = session.execute(disconnect_states_rows(state_ids)) _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows) - deleted_rows = ( - session.query(States) - .filter(States.state_id.in_(state_ids)) - .delete(synchronize_session=False) - ) + deleted_rows = session.execute(delete_states_rows(state_ids)) _LOGGER.debug("Deleted %s states", deleted_rows) # Evict eny entries in the old_states cache referring to a purged state @@ -348,12 +331,7 @@ def _purge_attributes_ids( instance: Recorder, session: Session, attributes_ids: set[int] ) -> None: """Delete old attributes ids.""" - - deleted_rows = ( - session.query(StateAttributes) - .filter(StateAttributes.attributes_id.in_(attributes_ids)) - .delete(synchronize_session=False) - ) + deleted_rows = session.execute(delete_states_attributes_rows(attributes_ids)) _LOGGER.debug("Deleted %s attribute states", deleted_rows) # Evict any entries in the state_attributes_ids cache referring to a purged state @@ -365,11 +343,7 @@ def _purge_event_data_ids( ) -> None: """Delete old event data ids.""" - deleted_rows = ( - session.query(EventData) - .filter(EventData.data_id.in_(data_ids)) - .delete(synchronize_session=False) - ) + deleted_rows = session.execute(delete_event_data_rows(data_ids)) _LOGGER.debug("Deleted %s data events", deleted_rows) # Evict any entries in the event_data_ids cache referring to a purged state @@ -378,11 +352,7 @@ def _purge_event_data_ids( def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None: """Delete by run_id.""" - deleted_rows = ( - session.query(StatisticsRuns) - .filter(StatisticsRuns.run_id.in_(statistics_runs)) - .delete(synchronize_session=False) - ) + deleted_rows = session.execute(delete_statistics_runs_rows(statistics_runs)) _LOGGER.debug("Deleted %s statistic runs", deleted_rows) @@ -390,21 +360,15 @@ def _purge_short_term_statistics( session: Session, short_term_statistics: list[int] ) -> None: """Delete by id.""" - deleted_rows = ( - session.query(StatisticsShortTerm) - .filter(StatisticsShortTerm.id.in_(short_term_statistics)) - .delete(synchronize_session=False) + deleted_rows = session.execute( + delete_statistics_short_term_rows(short_term_statistics) ) _LOGGER.debug("Deleted %s short term statistics", deleted_rows) def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None: """Delete by event id.""" - deleted_rows = ( - session.query(Events) - .filter(Events.event_id.in_(event_ids)) - .delete(synchronize_session=False) - ) + deleted_rows = session.execute(delete_event_rows(event_ids)) _LOGGER.debug("Deleted %s events", deleted_rows) @@ -413,11 +377,8 @@ def _purge_old_recorder_runs( ) -> None: """Purge all old recorder runs.""" # Recorder runs is small, no need to batch run it - deleted_rows = ( - session.query(RecorderRuns) - .filter(RecorderRuns.start < purge_before) - .filter(RecorderRuns.run_id != instance.run_history.current.run_id) - .delete(synchronize_session=False) + deleted_rows = session.execute( + delete_recorder_runs_rows(purge_before, instance.run_history.current.run_id) ) _LOGGER.debug("Deleted %s recorder_runs", deleted_rows) diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index ff4662e27b1..76098663bd7 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -1,11 +1,23 @@ """Queries for the recorder.""" from __future__ import annotations -from sqlalchemy import func, lambda_stmt, select, union_all +from collections.abc import Iterable +from datetime import datetime + +from sqlalchemy import delete, distinct, func, lambda_stmt, select, union_all, update from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.selectable import Select -from .models import EventData, Events, StateAttributes, States +from .const import MAX_ROWS_TO_PURGE +from .models import ( + EventData, + Events, + RecorderRuns, + StateAttributes, + States, + StatisticsRuns, + StatisticsShortTerm, +) def find_shared_attributes_id( @@ -33,6 +45,17 @@ def _state_attrs_exist(attr: int | None) -> Select: return select(func.min(States.attributes_id)).where(States.attributes_id == attr) +def attributes_ids_exist_in_states_sqlite( + attributes_ids: Iterable[int], +) -> StatementLambdaElement: + """Find attributes ids that exist in the states table.""" + return lambda_stmt( + lambda: select(distinct(States.attributes_id)).filter( + States.attributes_id.in_(attributes_ids) + ) + ) + + def attributes_ids_exist_in_states( attr1: int, attr2: int | None, @@ -245,6 +268,15 @@ def attributes_ids_exist_in_states( ) +def data_ids_exist_in_events_sqlite( + data_ids: Iterable[int], +) -> StatementLambdaElement: + """Find data ids that exist in the events table.""" + return lambda_stmt( + lambda: select(distinct(Events.data_id)).filter(Events.data_id.in_(data_ids)) + ) + + def _event_data_id_exist(data_id: int | None) -> Select: """Check if a event data id exists in the events table.""" return select(func.min(Events.data_id)).where(Events.data_id == data_id) @@ -460,3 +492,132 @@ def data_ids_exist_in_events( _event_data_id_exist(id100), ) ) + + +def disconnect_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement: + """Disconnect states rows.""" + return lambda_stmt( + lambda: update(States) + .where(States.old_state_id.in_(state_ids)) + .values(old_state_id=None) + .execution_options(synchronize_session=False) + ) + + +def delete_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement: + """Delete states rows.""" + return lambda_stmt( + lambda: delete(States) + .where(States.state_id.in_(state_ids)) + .execution_options(synchronize_session=False) + ) + + +def delete_event_data_rows(data_ids: Iterable[int]) -> StatementLambdaElement: + """Delete event_data rows.""" + return lambda_stmt( + lambda: delete(EventData) + .where(EventData.data_id.in_(data_ids)) + .execution_options(synchronize_session=False) + ) + + +def delete_states_attributes_rows( + attributes_ids: Iterable[int], +) -> StatementLambdaElement: + """Delete states_attributes rows.""" + return lambda_stmt( + lambda: delete(StateAttributes) + .where(StateAttributes.attributes_id.in_(attributes_ids)) + .execution_options(synchronize_session=False) + ) + + +def delete_statistics_runs_rows( + statistics_runs: Iterable[int], +) -> StatementLambdaElement: + """Delete statistics_runs rows.""" + return lambda_stmt( + lambda: delete(StatisticsRuns) + .where(StatisticsRuns.run_id.in_(statistics_runs)) + .execution_options(synchronize_session=False) + ) + + +def delete_statistics_short_term_rows( + short_term_statistics: Iterable[int], +) -> StatementLambdaElement: + """Delete statistics_short_term rows.""" + return lambda_stmt( + lambda: delete(StatisticsShortTerm) + .where(StatisticsShortTerm.id.in_(short_term_statistics)) + .execution_options(synchronize_session=False) + ) + + +def delete_event_rows( + event_ids: Iterable[int], +) -> StatementLambdaElement: + """Delete statistics_short_term rows.""" + return lambda_stmt( + lambda: delete(Events) + .where(Events.event_id.in_(event_ids)) + .execution_options(synchronize_session=False) + ) + + +def delete_recorder_runs_rows( + purge_before: datetime, current_run_id: int +) -> StatementLambdaElement: + """Delete recorder_runs rows.""" + return lambda_stmt( + lambda: delete(RecorderRuns) + .filter(RecorderRuns.start < purge_before) + .filter(RecorderRuns.run_id != current_run_id) + .execution_options(synchronize_session=False) + ) + + +def find_events_to_purge(purge_before: datetime) -> StatementLambdaElement: + """Find events to purge.""" + return lambda_stmt( + lambda: select(Events.event_id, Events.data_id) + .filter(Events.time_fired < purge_before) + .limit(MAX_ROWS_TO_PURGE) + ) + + +def find_states_to_purge(purge_before: datetime) -> StatementLambdaElement: + """Find states to purge.""" + return lambda_stmt( + lambda: select(States.state_id, States.attributes_id) + .filter(States.last_updated < purge_before) + .limit(MAX_ROWS_TO_PURGE) + ) + + +def find_short_term_statistics_to_purge( + purge_before: datetime, +) -> StatementLambdaElement: + """Find short term statistics to purge.""" + return lambda_stmt( + lambda: select(StatisticsShortTerm.id) + .filter(StatisticsShortTerm.start < purge_before) + .limit(MAX_ROWS_TO_PURGE) + ) + + +def find_statistics_runs_to_purge( + purge_before: datetime, +) -> StatementLambdaElement: + """Find statistics_runs to purge.""" + return lambda_stmt( + lambda: select(StatisticsRuns.run_id) + .filter(StatisticsRuns.start < purge_before) + .limit(MAX_ROWS_TO_PURGE) + ) + + +def find_latest_statistics_runs_run_id() -> StatementLambdaElement: + """Find the latest statistics_runs run_id.""" + return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id)))