Speed up nightly database purges with lambda_stmt (#71537)

This commit is contained in:
J. Nick Koston 2022-05-08 14:47:12 -05:00 committed by GitHub
parent 7c9c0e911a
commit 896bf986eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 205 additions and 83 deletions

View File

@ -7,23 +7,32 @@ from itertools import zip_longest
import logging
from typing import TYPE_CHECKING
from sqlalchemy import func
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED
from .const import MAX_ROWS_TO_PURGE, SupportedDialect
from .models import (
EventData,
Events,
RecorderRuns,
StateAttributes,
States,
StatisticsRuns,
StatisticsShortTerm,
from .models import Events, StateAttributes, States
from .queries import (
attributes_ids_exist_in_states,
attributes_ids_exist_in_states_sqlite,
data_ids_exist_in_events,
data_ids_exist_in_events_sqlite,
delete_event_data_rows,
delete_event_rows,
delete_recorder_runs_rows,
delete_states_attributes_rows,
delete_states_rows,
delete_statistics_runs_rows,
delete_statistics_short_term_rows,
disconnect_states_rows,
find_events_to_purge,
find_latest_statistics_runs_run_id,
find_short_term_statistics_to_purge,
find_states_to_purge,
find_statistics_runs_to_purge,
)
from .queries import attributes_ids_exist_in_states, data_ids_exist_in_events
from .repack import repack_database
from .util import retryable_database_job, session_scope
@ -101,19 +110,9 @@ def _select_event_state_attributes_ids_data_ids_to_purge(
session: Session, purge_before: datetime
) -> tuple[set[int], set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge."""
events = (
session.query(Events.event_id, Events.data_id)
.filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
events = session.execute(find_events_to_purge(purge_before)).all()
_LOGGER.debug("Selected %s event ids to remove", len(events))
states = (
session.query(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
states = session.execute(find_states_to_purge(purge_before)).all()
_LOGGER.debug("Selected %s state ids to remove", len(states))
event_ids = set()
state_ids = set()
@ -152,9 +151,9 @@ def _select_unused_attributes_ids(
#
seen_ids = {
state[0]
for state in session.query(distinct(States.attributes_id))
.filter(States.attributes_id.in_(attributes_ids))
.all()
for state in session.execute(
attributes_ids_exist_in_states_sqlite(attributes_ids)
).all()
}
else:
#
@ -210,9 +209,9 @@ def _select_unused_event_data_ids(
if using_sqlite:
seen_ids = {
state[0]
for state in session.query(distinct(Events.data_id))
.filter(Events.data_id.in_(data_ids))
.all()
for state in session.execute(
data_ids_exist_in_events_sqlite(data_ids)
).all()
}
else:
seen_ids = set()
@ -234,16 +233,11 @@ def _select_statistics_runs_to_purge(
session: Session, purge_before: datetime
) -> list[int]:
"""Return a list of statistic runs to purge, but take care to keep the newest run."""
statistic_runs = (
session.query(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all()
statistic_runs_list = [run.run_id for run in statistic_runs]
# Exclude the newest statistics run
if (
last_run := session.query(func.max(StatisticsRuns.run_id)).scalar()
last_run := session.execute(find_latest_statistics_runs_run_id()).scalar()
) and last_run in statistic_runs_list:
statistic_runs_list.remove(last_run)
@ -255,12 +249,9 @@ def _select_short_term_statistics_to_purge(
session: Session, purge_before: datetime
) -> list[int]:
"""Return a list of short term statistics to purge."""
statistics = (
session.query(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
statistics = session.execute(
find_short_term_statistics_to_purge(purge_before)
).all()
_LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
return [statistic.id for statistic in statistics]
@ -272,18 +263,10 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int])
# the delete does not fail due to a foreign key constraint
# since some databases (MSSQL) cannot do the ON DELETE SET NULL
# for us.
disconnected_rows = (
session.query(States)
.filter(States.old_state_id.in_(state_ids))
.update({"old_state_id": None}, synchronize_session=False)
)
disconnected_rows = session.execute(disconnect_states_rows(state_ids))
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)
deleted_rows = (
session.query(States)
.filter(States.state_id.in_(state_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_states_rows(state_ids))
_LOGGER.debug("Deleted %s states", deleted_rows)
# Evict eny entries in the old_states cache referring to a purged state
@ -348,12 +331,7 @@ def _purge_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids."""
deleted_rows = (
session.query(StateAttributes)
.filter(StateAttributes.attributes_id.in_(attributes_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_states_attributes_rows(attributes_ids))
_LOGGER.debug("Deleted %s attribute states", deleted_rows)
# Evict any entries in the state_attributes_ids cache referring to a purged state
@ -365,11 +343,7 @@ def _purge_event_data_ids(
) -> None:
"""Delete old event data ids."""
deleted_rows = (
session.query(EventData)
.filter(EventData.data_id.in_(data_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_event_data_rows(data_ids))
_LOGGER.debug("Deleted %s data events", deleted_rows)
# Evict any entries in the event_data_ids cache referring to a purged state
@ -378,11 +352,7 @@ def _purge_event_data_ids(
def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
"""Delete by run_id."""
deleted_rows = (
session.query(StatisticsRuns)
.filter(StatisticsRuns.run_id.in_(statistics_runs))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_statistics_runs_rows(statistics_runs))
_LOGGER.debug("Deleted %s statistic runs", deleted_rows)
@ -390,21 +360,15 @@ def _purge_short_term_statistics(
session: Session, short_term_statistics: list[int]
) -> None:
"""Delete by id."""
deleted_rows = (
session.query(StatisticsShortTerm)
.filter(StatisticsShortTerm.id.in_(short_term_statistics))
.delete(synchronize_session=False)
deleted_rows = session.execute(
delete_statistics_short_term_rows(short_term_statistics)
)
_LOGGER.debug("Deleted %s short term statistics", deleted_rows)
def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None:
"""Delete by event id."""
deleted_rows = (
session.query(Events)
.filter(Events.event_id.in_(event_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_event_rows(event_ids))
_LOGGER.debug("Deleted %s events", deleted_rows)
@ -413,11 +377,8 @@ def _purge_old_recorder_runs(
) -> None:
"""Purge all old recorder runs."""
# Recorder runs is small, no need to batch run it
deleted_rows = (
session.query(RecorderRuns)
.filter(RecorderRuns.start < purge_before)
.filter(RecorderRuns.run_id != instance.run_history.current.run_id)
.delete(synchronize_session=False)
deleted_rows = session.execute(
delete_recorder_runs_rows(purge_before, instance.run_history.current.run_id)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)

View File

@ -1,11 +1,23 @@
"""Queries for the recorder."""
from __future__ import annotations
from sqlalchemy import func, lambda_stmt, select, union_all
from collections.abc import Iterable
from datetime import datetime
from sqlalchemy import delete, distinct, func, lambda_stmt, select, union_all, update
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select
from .models import EventData, Events, StateAttributes, States
from .const import MAX_ROWS_TO_PURGE
from .models import (
EventData,
Events,
RecorderRuns,
StateAttributes,
States,
StatisticsRuns,
StatisticsShortTerm,
)
def find_shared_attributes_id(
@ -33,6 +45,17 @@ def _state_attrs_exist(attr: int | None) -> Select:
return select(func.min(States.attributes_id)).where(States.attributes_id == attr)
def attributes_ids_exist_in_states_sqlite(
attributes_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find attributes ids that exist in the states table."""
return lambda_stmt(
lambda: select(distinct(States.attributes_id)).filter(
States.attributes_id.in_(attributes_ids)
)
)
def attributes_ids_exist_in_states(
attr1: int,
attr2: int | None,
@ -245,6 +268,15 @@ def attributes_ids_exist_in_states(
)
def data_ids_exist_in_events_sqlite(
data_ids: Iterable[int],
) -> StatementLambdaElement:
"""Find data ids that exist in the events table."""
return lambda_stmt(
lambda: select(distinct(Events.data_id)).filter(Events.data_id.in_(data_ids))
)
def _event_data_id_exist(data_id: int | None) -> Select:
"""Check if a event data id exists in the events table."""
return select(func.min(Events.data_id)).where(Events.data_id == data_id)
@ -460,3 +492,132 @@ def data_ids_exist_in_events(
_event_data_id_exist(id100),
)
)
def disconnect_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement:
"""Disconnect states rows."""
return lambda_stmt(
lambda: update(States)
.where(States.old_state_id.in_(state_ids))
.values(old_state_id=None)
.execution_options(synchronize_session=False)
)
def delete_states_rows(state_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete states rows."""
return lambda_stmt(
lambda: delete(States)
.where(States.state_id.in_(state_ids))
.execution_options(synchronize_session=False)
)
def delete_event_data_rows(data_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete event_data rows."""
return lambda_stmt(
lambda: delete(EventData)
.where(EventData.data_id.in_(data_ids))
.execution_options(synchronize_session=False)
)
def delete_states_attributes_rows(
attributes_ids: Iterable[int],
) -> StatementLambdaElement:
"""Delete states_attributes rows."""
return lambda_stmt(
lambda: delete(StateAttributes)
.where(StateAttributes.attributes_id.in_(attributes_ids))
.execution_options(synchronize_session=False)
)
def delete_statistics_runs_rows(
statistics_runs: Iterable[int],
) -> StatementLambdaElement:
"""Delete statistics_runs rows."""
return lambda_stmt(
lambda: delete(StatisticsRuns)
.where(StatisticsRuns.run_id.in_(statistics_runs))
.execution_options(synchronize_session=False)
)
def delete_statistics_short_term_rows(
short_term_statistics: Iterable[int],
) -> StatementLambdaElement:
"""Delete statistics_short_term rows."""
return lambda_stmt(
lambda: delete(StatisticsShortTerm)
.where(StatisticsShortTerm.id.in_(short_term_statistics))
.execution_options(synchronize_session=False)
)
def delete_event_rows(
event_ids: Iterable[int],
) -> StatementLambdaElement:
"""Delete statistics_short_term rows."""
return lambda_stmt(
lambda: delete(Events)
.where(Events.event_id.in_(event_ids))
.execution_options(synchronize_session=False)
)
def delete_recorder_runs_rows(
purge_before: datetime, current_run_id: int
) -> StatementLambdaElement:
"""Delete recorder_runs rows."""
return lambda_stmt(
lambda: delete(RecorderRuns)
.filter(RecorderRuns.start < purge_before)
.filter(RecorderRuns.run_id != current_run_id)
.execution_options(synchronize_session=False)
)
def find_events_to_purge(purge_before: datetime) -> StatementLambdaElement:
"""Find events to purge."""
return lambda_stmt(
lambda: select(Events.event_id, Events.data_id)
.filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
def find_states_to_purge(purge_before: datetime) -> StatementLambdaElement:
"""Find states to purge."""
return lambda_stmt(
lambda: select(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
def find_short_term_statistics_to_purge(
purge_before: datetime,
) -> StatementLambdaElement:
"""Find short term statistics to purge."""
return lambda_stmt(
lambda: select(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
def find_statistics_runs_to_purge(
purge_before: datetime,
) -> StatementLambdaElement:
"""Find statistics_runs to purge."""
return lambda_stmt(
lambda: select(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
)
def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
"""Find the latest statistics_runs run_id."""
return lambda_stmt(lambda: select(func.max(StatisticsRuns.run_id)))