Increase max bind vars based on database version (#101464)

This commit is contained in:
J. Nick Koston 2023-10-06 00:59:38 -05:00 committed by GitHub
parent c7d533d427
commit da1d5fc862
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 237 additions and 157 deletions

View File

@ -30,6 +30,12 @@ QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY = 0.65
# have upgraded their sqlite version # have upgraded their sqlite version
SQLITE_MAX_BIND_VARS = 998 SQLITE_MAX_BIND_VARS = 998
# The maximum bind vars for sqlite 3.32.0 and above, but
# capped at 4000 to avoid performance issues
SQLITE_MODERN_MAX_BIND_VARS = 4000
DEFAULT_MAX_BIND_VARS = 4000
DB_WORKER_PREFIX = "DbWorker" DB_WORKER_PREFIX = "DbWorker"
ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES} ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES}

View File

@ -55,6 +55,7 @@ from .const import (
MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX, MYSQLDB_URL_PREFIX,
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY, QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX, SQLITE_URL_PREFIX,
STATES_META_SCHEMA_VERSION, STATES_META_SCHEMA_VERSION,
STATISTICS_ROWS_SCHEMA_VERSION, STATISTICS_ROWS_SCHEMA_VERSION,
@ -242,6 +243,13 @@ class Recorder(threading.Thread):
self._dialect_name: SupportedDialect | None = None self._dialect_name: SupportedDialect | None = None
self.enabled = True self.enabled = True
# For safety we default to the lowest value for max_bind_vars
# of all the DB types (SQLITE_MAX_BIND_VARS).
#
# We update the value once we connect to the DB
# and determine what is actually supported.
self.max_bind_vars = SQLITE_MAX_BIND_VARS
@property @property
def backlog(self) -> int: def backlog(self) -> int:
"""Return the number of items in the recorder backlog.""" """Return the number of items in the recorder backlog."""
@ -1351,6 +1359,7 @@ class Recorder(threading.Thread):
not self._completed_first_database_setup, not self._completed_first_database_setup,
): ):
self.database_engine = database_engine self.database_engine = database_engine
self.max_bind_vars = database_engine.max_bind_vars
self._completed_first_database_setup = True self._completed_first_database_setup = True
def _setup_connection(self) -> None: def _setup_connection(self) -> None:

View File

@ -1366,7 +1366,9 @@ def migrate_states_context_ids(instance: Recorder) -> bool:
session_maker = instance.get_session session_maker = instance.get_session
_LOGGER.debug("Migrating states context_ids to binary format") _LOGGER.debug("Migrating states context_ids to binary format")
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
if states := session.execute(find_states_context_ids_to_migrate()).all(): if states := session.execute(
find_states_context_ids_to_migrate(instance.max_bind_vars)
).all():
session.execute( session.execute(
update(States), update(States),
[ [
@ -1401,7 +1403,9 @@ def migrate_events_context_ids(instance: Recorder) -> bool:
session_maker = instance.get_session session_maker = instance.get_session
_LOGGER.debug("Migrating context_ids to binary format") _LOGGER.debug("Migrating context_ids to binary format")
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
if events := session.execute(find_events_context_ids_to_migrate()).all(): if events := session.execute(
find_events_context_ids_to_migrate(instance.max_bind_vars)
).all():
session.execute( session.execute(
update(Events), update(Events),
[ [
@ -1436,7 +1440,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
_LOGGER.debug("Migrating event_types") _LOGGER.debug("Migrating event_types")
event_type_manager = instance.event_type_manager event_type_manager = instance.event_type_manager
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
if events := session.execute(find_event_type_to_migrate()).all(): if events := session.execute(
find_event_type_to_migrate(instance.max_bind_vars)
).all():
event_types = {event_type for _, event_type in events} event_types = {event_type for _, event_type in events}
if None in event_types: if None in event_types:
# event_type should never be None but we need to be defensive # event_type should never be None but we need to be defensive
@ -1505,7 +1511,9 @@ def migrate_entity_ids(instance: Recorder) -> bool:
_LOGGER.debug("Migrating entity_ids") _LOGGER.debug("Migrating entity_ids")
states_meta_manager = instance.states_meta_manager states_meta_manager = instance.states_meta_manager
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
if states := session.execute(find_entity_ids_to_migrate()).all(): if states := session.execute(
find_entity_ids_to_migrate(instance.max_bind_vars)
).all():
entity_ids = {entity_id for _, entity_id in states} entity_ids = {entity_id for _, entity_id in states}
if None in entity_ids: if None in entity_ids:
# entity_id should never be None but we need to be defensive # entity_id should never be None but we need to be defensive

View File

@ -18,6 +18,7 @@ class DatabaseEngine:
dialect: SupportedDialect dialect: SupportedDialect
optimizer: DatabaseOptimizer optimizer: DatabaseOptimizer
max_bind_vars: int
version: AwesomeVersion | None version: AwesomeVersion | None

View File

@ -12,7 +12,6 @@ from sqlalchemy.orm.session import Session
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import SQLITE_MAX_BIND_VARS
from .db_schema import Events, States, StatesMeta from .db_schema import Events, States, StatesMeta
from .models import DatabaseEngine from .models import DatabaseEngine
from .queries import ( from .queries import (
@ -72,7 +71,7 @@ def purge_old_data(
purge_before.isoformat(sep=" ", timespec="seconds"), purge_before.isoformat(sep=" ", timespec="seconds"),
) )
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record # Purge a max of max_bind_vars, based on the oldest states or events record
has_more_to_purge = False has_more_to_purge = False
if instance.use_legacy_events_index and _purging_legacy_format(session): if instance.use_legacy_events_index and _purging_legacy_format(session):
_LOGGER.debug( _LOGGER.debug(
@ -93,9 +92,11 @@ def purge_old_data(
instance, session, events_batch_size, purge_before instance, session, events_batch_size, purge_before
) )
statistics_runs = _select_statistics_runs_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(
session, purge_before, instance.max_bind_vars
)
short_term_statistics = _select_short_term_statistics_to_purge( short_term_statistics = _select_short_term_statistics_to_purge(
session, purge_before session, purge_before, instance.max_bind_vars
) )
if statistics_runs: if statistics_runs:
_purge_statistics_runs(session, statistics_runs) _purge_statistics_runs(session, statistics_runs)
@ -141,7 +142,7 @@ def _purge_legacy_format(
attributes_ids, attributes_ids,
data_ids, data_ids,
) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge( ) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
session, purge_before session, purge_before, instance.max_bind_vars
) )
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
_purge_unused_attributes_ids(instance, session, attributes_ids) _purge_unused_attributes_ids(instance, session, attributes_ids)
@ -157,7 +158,7 @@ def _purge_legacy_format(
detached_state_ids, detached_state_ids,
detached_attributes_ids, detached_attributes_ids,
) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( ) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
session, purge_before session, purge_before, instance.max_bind_vars
) )
_purge_state_ids(instance, session, detached_state_ids) _purge_state_ids(instance, session, detached_state_ids)
_purge_unused_attributes_ids(instance, session, detached_attributes_ids) _purge_unused_attributes_ids(instance, session, detached_attributes_ids)
@ -187,11 +188,12 @@ def _purge_states_and_attributes_ids(
# There are more states relative to attributes_ids so # There are more states relative to attributes_ids so
# we purge enough state_ids to try to generate a full # we purge enough state_ids to try to generate a full
# size batch of attributes_ids that will be around the size # size batch of attributes_ids that will be around the size
# SQLITE_MAX_BIND_VARS # max_bind_vars
attributes_ids_batch: set[int] = set() attributes_ids_batch: set[int] = set()
max_bind_vars = instance.max_bind_vars
for _ in range(states_batch_size): for _ in range(states_batch_size):
state_ids, attributes_ids = _select_state_attributes_ids_to_purge( state_ids, attributes_ids = _select_state_attributes_ids_to_purge(
session, purge_before session, purge_before, max_bind_vars
) )
if not state_ids: if not state_ids:
has_remaining_state_ids_to_purge = False has_remaining_state_ids_to_purge = False
@ -221,10 +223,13 @@ def _purge_events_and_data_ids(
# There are more events relative to data_ids so # There are more events relative to data_ids so
# we purge enough event_ids to try to generate a full # we purge enough event_ids to try to generate a full
# size batch of data_ids that will be around the size # size batch of data_ids that will be around the size
# SQLITE_MAX_BIND_VARS # max_bind_vars
data_ids_batch: set[int] = set() data_ids_batch: set[int] = set()
max_bind_vars = instance.max_bind_vars
for _ in range(events_batch_size): for _ in range(events_batch_size):
event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before) event_ids, data_ids = _select_event_data_ids_to_purge(
session, purge_before, max_bind_vars
)
if not event_ids: if not event_ids:
has_remaining_event_ids_to_purge = False has_remaining_event_ids_to_purge = False
break break
@ -240,13 +245,13 @@ def _purge_events_and_data_ids(
def _select_state_attributes_ids_to_purge( def _select_state_attributes_ids_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]: ) -> tuple[set[int], set[int]]:
"""Return sets of state and attribute ids to purge.""" """Return sets of state and attribute ids to purge."""
state_ids = set() state_ids = set()
attributes_ids = set() attributes_ids = set()
for state_id, attributes_id in session.execute( for state_id, attributes_id in session.execute(
find_states_to_purge(dt_util.utc_to_timestamp(purge_before)) find_states_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars)
).all(): ).all():
state_ids.add(state_id) state_ids.add(state_id)
if attributes_id: if attributes_id:
@ -260,13 +265,13 @@ def _select_state_attributes_ids_to_purge(
def _select_event_data_ids_to_purge( def _select_event_data_ids_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]: ) -> tuple[set[int], set[int]]:
"""Return sets of event and data ids to purge.""" """Return sets of event and data ids to purge."""
event_ids = set() event_ids = set()
data_ids = set() data_ids = set()
for event_id, data_id in session.execute( for event_id, data_id in session.execute(
find_events_to_purge(dt_util.utc_to_timestamp(purge_before)) find_events_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars)
).all(): ).all():
event_ids.add(event_id) event_ids.add(event_id)
if data_id: if data_id:
@ -323,7 +328,7 @@ def _select_unused_attributes_ids(
# #
# We used to generate a query based on how many attribute_ids to find but # We used to generate a query based on how many attribute_ids to find but
# that meant sqlalchemy Transparent SQL Compilation Caching was working against # that meant sqlalchemy Transparent SQL Compilation Caching was working against
# us by cached up to SQLITE_MAX_BIND_VARS different statements which could be # us by cached up to max_bind_vars different statements which could be
# up to 500MB for large database due to the complexity of the ORM objects. # up to 500MB for large database due to the complexity of the ORM objects.
# #
# We now break the query into groups of 100 and use a lambda_stmt to ensure # We now break the query into groups of 100 and use a lambda_stmt to ensure
@ -405,13 +410,15 @@ def _purge_unused_data_ids(
def _select_statistics_runs_to_purge( def _select_statistics_runs_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> list[int]: ) -> list[int]:
"""Return a list of statistic runs to purge. """Return a list of statistic runs to purge.
Takes care to keep the newest run. Takes care to keep the newest run.
""" """
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all() statistic_runs = session.execute(
find_statistics_runs_to_purge(purge_before, max_bind_vars)
).all()
statistic_runs_list = [run_id for (run_id,) in statistic_runs] statistic_runs_list = [run_id for (run_id,) in statistic_runs]
# Exclude the newest statistics run # Exclude the newest statistics run
if ( if (
@ -424,18 +431,18 @@ def _select_statistics_runs_to_purge(
def _select_short_term_statistics_to_purge( def _select_short_term_statistics_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> list[int]: ) -> list[int]:
"""Return a list of short term statistics to purge.""" """Return a list of short term statistics to purge."""
statistics = session.execute( statistics = session.execute(
find_short_term_statistics_to_purge(purge_before) find_short_term_statistics_to_purge(purge_before, max_bind_vars)
).all() ).all()
_LOGGER.debug("Selected %s short term statistics to remove", len(statistics)) _LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
return [statistic_id for (statistic_id,) in statistics] return [statistic_id for (statistic_id,) in statistics]
def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]: ) -> tuple[set[int], set[int]]:
"""Return a list of state, and attribute ids to purge. """Return a list of state, and attribute ids to purge.
@ -445,7 +452,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
""" """
states = session.execute( states = session.execute(
find_legacy_detached_states_and_attributes_to_purge( find_legacy_detached_states_and_attributes_to_purge(
dt_util.utc_to_timestamp(purge_before) dt_util.utc_to_timestamp(purge_before), max_bind_vars
) )
).all() ).all()
_LOGGER.debug("Selected %s state ids to remove", len(states)) _LOGGER.debug("Selected %s state ids to remove", len(states))
@ -460,7 +467,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
session: Session, purge_before: datetime session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int], set[int], set[int]]: ) -> tuple[set[int], set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge linked by the event_id. """Return a list of event, state, and attribute ids to purge linked by the event_id.
@ -470,7 +477,7 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
""" """
events = session.execute( events = session.execute(
find_legacy_event_state_and_attributes_and_data_ids_to_purge( find_legacy_event_state_and_attributes_and_data_ids_to_purge(
dt_util.utc_to_timestamp(purge_before) dt_util.utc_to_timestamp(purge_before), max_bind_vars
) )
).all() ).all()
_LOGGER.debug("Selected %s event ids to remove", len(events)) _LOGGER.debug("Selected %s event ids to remove", len(events))
@ -511,8 +518,8 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int])
def _purge_batch_attributes_ids( def _purge_batch_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int] instance: Recorder, session: Session, attributes_ids: set[int]
) -> None: ) -> None:
"""Delete old attributes ids in batches of SQLITE_MAX_BIND_VARS.""" """Delete old attributes ids in batches of max_bind_vars."""
for attributes_ids_chunk in chunked(attributes_ids, SQLITE_MAX_BIND_VARS): for attributes_ids_chunk in chunked(attributes_ids, instance.max_bind_vars):
deleted_rows = session.execute( deleted_rows = session.execute(
delete_states_attributes_rows(attributes_ids_chunk) delete_states_attributes_rows(attributes_ids_chunk)
) )
@ -525,8 +532,8 @@ def _purge_batch_attributes_ids(
def _purge_batch_data_ids( def _purge_batch_data_ids(
instance: Recorder, session: Session, data_ids: set[int] instance: Recorder, session: Session, data_ids: set[int]
) -> None: ) -> None:
"""Delete old event data ids in batches of SQLITE_MAX_BIND_VARS.""" """Delete old event data ids in batches of max_bind_vars."""
for data_ids_chunk in chunked(data_ids, SQLITE_MAX_BIND_VARS): for data_ids_chunk in chunked(data_ids, instance.max_bind_vars):
deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk)) deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
_LOGGER.debug("Deleted %s data events", deleted_rows) _LOGGER.debug("Deleted %s data events", deleted_rows)
@ -671,7 +678,7 @@ def _purge_filtered_states(
session.query(States.state_id, States.attributes_id, States.event_id) session.query(States.state_id, States.attributes_id, States.event_id)
.filter(States.metadata_id.in_(metadata_ids_to_purge)) .filter(States.metadata_id.in_(metadata_ids_to_purge))
.filter(States.last_updated_ts < purge_before_timestamp) .filter(States.last_updated_ts < purge_before_timestamp)
.limit(SQLITE_MAX_BIND_VARS) .limit(instance.max_bind_vars)
.all() .all()
) )
if not to_purge: if not to_purge:
@ -709,7 +716,7 @@ def _purge_filtered_events(
session.query(Events.event_id, Events.data_id) session.query(Events.event_id, Events.data_id)
.filter(Events.event_type_id.in_(excluded_event_type_ids)) .filter(Events.event_type_id.in_(excluded_event_type_ids))
.filter(Events.time_fired_ts < purge_before_timestamp) .filter(Events.time_fired_ts < purge_before_timestamp)
.limit(SQLITE_MAX_BIND_VARS) .limit(instance.max_bind_vars)
.all() .all()
) )
if not to_purge: if not to_purge:
@ -760,7 +767,7 @@ def purge_entity_data(
if not selected_metadata_ids: if not selected_metadata_ids:
return True return True
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states # Purge a max of max_bind_vars, based on the oldest states
# or events record. # or events record.
if not _purge_filtered_states( if not _purge_filtered_states(
instance, instance,

View File

@ -8,7 +8,6 @@ from sqlalchemy import delete, distinct, func, lambda_stmt, select, union_all, u
from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select from sqlalchemy.sql.selectable import Select
from .const import SQLITE_MAX_BIND_VARS
from .db_schema import ( from .db_schema import (
EventData, EventData,
Events, Events,
@ -612,44 +611,48 @@ def delete_recorder_runs_rows(
) )
def find_events_to_purge(purge_before: float) -> StatementLambdaElement: def find_events_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find events to purge.""" """Find events to purge."""
return lambda_stmt( return lambda_stmt(
lambda: select(Events.event_id, Events.data_id) lambda: select(Events.event_id, Events.data_id)
.filter(Events.time_fired_ts < purge_before) .filter(Events.time_fired_ts < purge_before)
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_states_to_purge(purge_before: float) -> StatementLambdaElement: def find_states_to_purge(
purge_before: float, max_bind_vars: int
) -> StatementLambdaElement:
"""Find states to purge.""" """Find states to purge."""
return lambda_stmt( return lambda_stmt(
lambda: select(States.state_id, States.attributes_id) lambda: select(States.state_id, States.attributes_id)
.filter(States.last_updated_ts < purge_before) .filter(States.last_updated_ts < purge_before)
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_short_term_statistics_to_purge( def find_short_term_statistics_to_purge(
purge_before: datetime, purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find short term statistics to purge.""" """Find short term statistics to purge."""
purge_before_ts = purge_before.timestamp() purge_before_ts = purge_before.timestamp()
return lambda_stmt( return lambda_stmt(
lambda: select(StatisticsShortTerm.id) lambda: select(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start_ts < purge_before_ts) .filter(StatisticsShortTerm.start_ts < purge_before_ts)
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_statistics_runs_to_purge( def find_statistics_runs_to_purge(
purge_before: datetime, purge_before: datetime, max_bind_vars: int
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find statistics_runs to purge.""" """Find statistics_runs to purge."""
return lambda_stmt( return lambda_stmt(
lambda: select(StatisticsRuns.run_id) lambda: select(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before) .filter(StatisticsRuns.start < purge_before)
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
@ -659,7 +662,7 @@ def find_latest_statistics_runs_run_id() -> StatementLambdaElement:
def find_legacy_event_state_and_attributes_and_data_ids_to_purge( def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
purge_before: float, purge_before: float, max_bind_vars: int
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find the latest row in the legacy format to purge.""" """Find the latest row in the legacy format to purge."""
return lambda_stmt( return lambda_stmt(
@ -668,12 +671,12 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge(
) )
.outerjoin(States, Events.event_id == States.event_id) .outerjoin(States, Events.event_id == States.event_id)
.filter(Events.time_fired_ts < purge_before) .filter(Events.time_fired_ts < purge_before)
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_legacy_detached_states_and_attributes_to_purge( def find_legacy_detached_states_and_attributes_to_purge(
purge_before: float, purge_before: float, max_bind_vars: int
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find states rows with event_id set but not linked event_id in Events.""" """Find states rows with event_id set but not linked event_id in Events."""
return lambda_stmt( return lambda_stmt(
@ -684,7 +687,7 @@ def find_legacy_detached_states_and_attributes_to_purge(
(States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None) (States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None)
) )
.filter(Events.event_id.is_(None)) .filter(Events.event_id.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
@ -693,7 +696,7 @@ def find_legacy_row() -> StatementLambdaElement:
return lambda_stmt(lambda: select(func.max(States.event_id))) return lambda_stmt(lambda: select(func.max(States.event_id)))
def find_events_context_ids_to_migrate() -> StatementLambdaElement: def find_events_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events context_ids to migrate.""" """Find events context_ids to migrate."""
return lambda_stmt( return lambda_stmt(
lambda: select( lambda: select(
@ -704,11 +707,11 @@ def find_events_context_ids_to_migrate() -> StatementLambdaElement:
Events.context_parent_id, Events.context_parent_id,
) )
.filter(Events.context_id_bin.is_(None)) .filter(Events.context_id_bin.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_event_type_to_migrate() -> StatementLambdaElement: def find_event_type_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events event_type to migrate.""" """Find events event_type to migrate."""
return lambda_stmt( return lambda_stmt(
lambda: select( lambda: select(
@ -716,11 +719,11 @@ def find_event_type_to_migrate() -> StatementLambdaElement:
Events.event_type, Events.event_type,
) )
.filter(Events.event_type_id.is_(None)) .filter(Events.event_type_id.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
def find_entity_ids_to_migrate() -> StatementLambdaElement: def find_entity_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find entity_id to migrate.""" """Find entity_id to migrate."""
return lambda_stmt( return lambda_stmt(
lambda: select( lambda: select(
@ -728,7 +731,7 @@ def find_entity_ids_to_migrate() -> StatementLambdaElement:
States.entity_id, States.entity_id,
) )
.filter(States.metadata_id.is_(None)) .filter(States.metadata_id.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )
@ -792,7 +795,7 @@ def has_entity_ids_to_migrate() -> StatementLambdaElement:
) )
def find_states_context_ids_to_migrate() -> StatementLambdaElement: def find_states_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement:
"""Find events context_ids to migrate.""" """Find events context_ids to migrate."""
return lambda_stmt( return lambda_stmt(
lambda: select( lambda: select(
@ -803,7 +806,7 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement:
States.context_parent_id, States.context_parent_id,
) )
.filter(States.context_id_bin.is_(None)) .filter(States.context_id_bin.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(max_bind_vars)
) )

View File

@ -10,7 +10,6 @@ from sqlalchemy.orm.session import Session
from homeassistant.core import Event from homeassistant.core import Event
from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import EventData from ..db_schema import EventData
from ..queries import get_shared_event_datas from ..queries import get_shared_event_datas
from ..util import chunked, execute_stmt_lambda_element from ..util import chunked, execute_stmt_lambda_element
@ -95,7 +94,7 @@ class EventDataManager(BaseLRUTableManager[EventData]):
""" """
results: dict[str, int | None] = {} results: dict[str, int | None] = {}
with session.no_autoflush: with session.no_autoflush:
for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): for hashs_chunk in chunked(hashes, self.recorder.max_bind_vars):
for data_id, shared_data in execute_stmt_lambda_element( for data_id, shared_data in execute_stmt_lambda_element(
session, get_shared_event_datas(hashs_chunk), orm_rows=False session, get_shared_event_datas(hashs_chunk), orm_rows=False
): ):

View File

@ -9,7 +9,6 @@ from sqlalchemy.orm.session import Session
from homeassistant.core import Event from homeassistant.core import Event
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import EventTypes from ..db_schema import EventTypes
from ..queries import find_event_type_ids from ..queries import find_event_type_ids
from ..tasks import RefreshEventTypesTask from ..tasks import RefreshEventTypesTask
@ -78,7 +77,7 @@ class EventTypeManager(BaseLRUTableManager[EventTypes]):
return results return results
with session.no_autoflush: with session.no_autoflush:
for missing_chunk in chunked(missing, SQLITE_MAX_BIND_VARS): for missing_chunk in chunked(missing, self.recorder.max_bind_vars):
for event_type_id, event_type in execute_stmt_lambda_element( for event_type_id, event_type in execute_stmt_lambda_element(
session, find_event_type_ids(missing_chunk), orm_rows=False session, find_event_type_ids(missing_chunk), orm_rows=False
): ):

View File

@ -11,7 +11,6 @@ from homeassistant.core import Event
from homeassistant.helpers.entity import entity_sources from homeassistant.helpers.entity import entity_sources
from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import StateAttributes from ..db_schema import StateAttributes
from ..queries import get_shared_attributes from ..queries import get_shared_attributes
from ..util import chunked, execute_stmt_lambda_element from ..util import chunked, execute_stmt_lambda_element
@ -108,7 +107,7 @@ class StateAttributesManager(BaseLRUTableManager[StateAttributes]):
""" """
results: dict[str, int | None] = {} results: dict[str, int | None] = {}
with session.no_autoflush: with session.no_autoflush:
for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): for hashs_chunk in chunked(hashes, self.recorder.max_bind_vars):
for attributes_id, shared_attrs in execute_stmt_lambda_element( for attributes_id, shared_attrs in execute_stmt_lambda_element(
session, get_shared_attributes(hashs_chunk), orm_rows=False session, get_shared_attributes(hashs_chunk), orm_rows=False
): ):

View File

@ -8,7 +8,6 @@ from sqlalchemy.orm.session import Session
from homeassistant.core import Event from homeassistant.core import Event
from ..const import SQLITE_MAX_BIND_VARS
from ..db_schema import StatesMeta from ..db_schema import StatesMeta
from ..queries import find_all_states_metadata_ids, find_states_metadata_ids from ..queries import find_all_states_metadata_ids, find_states_metadata_ids
from ..util import chunked, execute_stmt_lambda_element from ..util import chunked, execute_stmt_lambda_element
@ -104,7 +103,7 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
update_cache = from_recorder or not self._did_first_load update_cache = from_recorder or not self._did_first_load
with session.no_autoflush: with session.no_autoflush:
for missing_chunk in chunked(missing, SQLITE_MAX_BIND_VARS): for missing_chunk in chunked(missing, self.recorder.max_bind_vars):
for metadata_id, entity_id in execute_stmt_lambda_element( for metadata_id, entity_id in execute_stmt_lambda_element(
session, find_states_metadata_ids(missing_chunk) session, find_states_metadata_ids(missing_chunk)
): ):

View File

@ -31,7 +31,15 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, issue_registry as ir from homeassistant.helpers import config_validation as cv, issue_registry as ir
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX, SupportedDialect from .const import (
DATA_INSTANCE,
DEFAULT_MAX_BIND_VARS,
DOMAIN,
SQLITE_MAX_BIND_VARS,
SQLITE_MODERN_MAX_BIND_VARS,
SQLITE_URL_PREFIX,
SupportedDialect,
)
from .db_schema import ( from .db_schema import (
TABLE_RECORDER_RUNS, TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES, TABLE_SCHEMA_CHANGES,
@ -87,6 +95,7 @@ MARIADB_WITH_FIXED_IN_QUERIES_108 = _simple_version("10.8.4")
MIN_VERSION_MYSQL = _simple_version("8.0.0") MIN_VERSION_MYSQL = _simple_version("8.0.0")
MIN_VERSION_PGSQL = _simple_version("12.0") MIN_VERSION_PGSQL = _simple_version("12.0")
MIN_VERSION_SQLITE = _simple_version("3.31.0") MIN_VERSION_SQLITE = _simple_version("3.31.0")
MIN_VERSION_SQLITE_MODERN_BIND_VARS = _simple_version("3.32.0")
# This is the maximum time after the recorder ends the session # This is the maximum time after the recorder ends the session
@ -471,6 +480,7 @@ def setup_connection_for_dialect(
version: AwesomeVersion | None = None version: AwesomeVersion | None = None
slow_range_in_select = False slow_range_in_select = False
if dialect_name == SupportedDialect.SQLITE: if dialect_name == SupportedDialect.SQLITE:
max_bind_vars = SQLITE_MAX_BIND_VARS
if first_connection: if first_connection:
old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined] old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined]
dbapi_connection.isolation_level = None # type: ignore[attr-defined] dbapi_connection.isolation_level = None # type: ignore[attr-defined]
@ -488,6 +498,9 @@ def setup_connection_for_dialect(
version or version_string, "SQLite", MIN_VERSION_SQLITE version or version_string, "SQLite", MIN_VERSION_SQLITE
) )
if version and version > MIN_VERSION_SQLITE_MODERN_BIND_VARS:
max_bind_vars = SQLITE_MODERN_MAX_BIND_VARS
# The upper bound on the cache size is approximately 16MiB of memory # The upper bound on the cache size is approximately 16MiB of memory
execute_on_connection(dbapi_connection, "PRAGMA cache_size = -16384") execute_on_connection(dbapi_connection, "PRAGMA cache_size = -16384")
@ -506,6 +519,7 @@ def setup_connection_for_dialect(
execute_on_connection(dbapi_connection, "PRAGMA foreign_keys=ON") execute_on_connection(dbapi_connection, "PRAGMA foreign_keys=ON")
elif dialect_name == SupportedDialect.MYSQL: elif dialect_name == SupportedDialect.MYSQL:
max_bind_vars = DEFAULT_MAX_BIND_VARS
execute_on_connection(dbapi_connection, "SET session wait_timeout=28800") execute_on_connection(dbapi_connection, "SET session wait_timeout=28800")
if first_connection: if first_connection:
result = query_on_connection(dbapi_connection, "SELECT VERSION()") result = query_on_connection(dbapi_connection, "SELECT VERSION()")
@ -546,6 +560,7 @@ def setup_connection_for_dialect(
# Ensure all times are using UTC to avoid issues with daylight savings # Ensure all times are using UTC to avoid issues with daylight savings
execute_on_connection(dbapi_connection, "SET time_zone = '+00:00'") execute_on_connection(dbapi_connection, "SET time_zone = '+00:00'")
elif dialect_name == SupportedDialect.POSTGRESQL: elif dialect_name == SupportedDialect.POSTGRESQL:
max_bind_vars = DEFAULT_MAX_BIND_VARS
if first_connection: if first_connection:
# server_version_num was added in 2006 # server_version_num was added in 2006
result = query_on_connection(dbapi_connection, "SHOW server_version") result = query_on_connection(dbapi_connection, "SHOW server_version")
@ -566,6 +581,7 @@ def setup_connection_for_dialect(
dialect=SupportedDialect(dialect_name), dialect=SupportedDialect(dialect_name),
version=version, version=version,
optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select), optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select),
max_bind_vars=max_bind_vars,
) )

View File

@ -10,11 +10,7 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import purge, queries from homeassistant.components.recorder.const import SupportedDialect
from homeassistant.components.recorder.const import (
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from homeassistant.components.recorder.db_schema import ( from homeassistant.components.recorder.db_schema import (
Events, Events,
EventTypes, EventTypes,
@ -71,6 +67,39 @@ def mock_use_sqlite(request):
yield yield
async def test_purge_big_database(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test deleting 2/3 old states from a big database."""
instance = await async_setup_recorder_instance(hass)
for _ in range(25):
await _add_test_states(hass, wait_recording_done=False)
await async_wait_recording_done(hass)
with patch.object(instance, "max_bind_vars", 100), patch.object(
instance.database_engine, "max_bind_vars", 100
), session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
assert states.count() == 150
assert state_attributes.count() == 3
purge_before = dt_util.utcnow() - timedelta(days=4)
finished = purge_old_data(
instance,
purge_before,
states_batch_size=1,
events_batch_size=1,
repack=False,
)
assert not finished
assert states.count() == 50
assert state_attributes.count() == 1
async def test_purge_old_states( async def test_purge_old_states(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None: ) -> None:
@ -628,7 +657,7 @@ async def test_purge_cutoff_date(
service_data = {"keep_days": 2} service_data = {"keep_days": 2}
# Force multiple purge batches to be run # Force multiple purge batches to be run
rows = SQLITE_MAX_BIND_VARS + 1 rows = 999
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
await _add_db_entries(hass, cutoff, rows) await _add_db_entries(hass, cutoff, rows)
@ -1411,7 +1440,7 @@ async def test_purge_entities(
assert states.count() == 0 assert states.count() == 0
async def _add_test_states(hass: HomeAssistant): async def _add_test_states(hass: HomeAssistant, wait_recording_done: bool = True):
"""Add multiple states to the db for testing.""" """Add multiple states to the db for testing."""
utcnow = dt_util.utcnow() utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5) five_days_ago = utcnow - timedelta(days=5)
@ -1421,24 +1450,26 @@ async def _add_test_states(hass: HomeAssistant):
async def set_state(entity_id, state, **kwargs): async def set_state(entity_id, state, **kwargs):
"""Set the state.""" """Set the state."""
hass.states.async_set(entity_id, state, **kwargs) hass.states.async_set(entity_id, state, **kwargs)
await hass.async_block_till_done() if wait_recording_done:
await async_wait_recording_done(hass) await hass.async_block_till_done()
await async_wait_recording_done(hass)
for event_id in range(6): with freeze_time() as freezer:
if event_id < 2: for event_id in range(6):
timestamp = eleven_days_ago if event_id < 2:
state = f"autopurgeme_{event_id}" timestamp = eleven_days_ago
attributes = {"autopurgeme": True, **base_attributes} state = f"autopurgeme_{event_id}"
elif event_id < 4: attributes = {"autopurgeme": True, **base_attributes}
timestamp = five_days_ago elif event_id < 4:
state = f"purgeme_{event_id}" timestamp = five_days_ago
attributes = {"purgeme": True, **base_attributes} state = f"purgeme_{event_id}"
else: attributes = {"purgeme": True, **base_attributes}
timestamp = utcnow else:
state = f"dontpurgeme_{event_id}" timestamp = utcnow
attributes = {"dontpurgeme": True, **base_attributes} state = f"dontpurgeme_{event_id}"
attributes = {"dontpurgeme": True, **base_attributes}
with freeze_time(timestamp): freezer.move_to(timestamp)
await set_state("test.recorder2", state, attributes=attributes) await set_state("test.recorder2", state, attributes=attributes)
@ -1453,18 +1484,19 @@ async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
# thread as well can cause the test to fail # thread as well can cause the test to fail
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
for _ in range(iterations): with freeze_time() as freezer:
for event_id in range(6): for _ in range(iterations):
if event_id < 2: for event_id in range(6):
timestamp = eleven_days_ago if event_id < 2:
event_type = "EVENT_TEST_AUTOPURGE" timestamp = eleven_days_ago
elif event_id < 4: event_type = "EVENT_TEST_AUTOPURGE"
timestamp = five_days_ago elif event_id < 4:
event_type = "EVENT_TEST_PURGE" timestamp = five_days_ago
else: event_type = "EVENT_TEST_PURGE"
timestamp = utcnow else:
event_type = "EVENT_TEST" timestamp = utcnow
with freeze_time(timestamp): event_type = "EVENT_TEST"
freezer.move_to(timestamp)
hass.bus.async_fire(event_type, event_data) hass.bus.async_fire(event_type, event_data)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
@ -1605,11 +1637,11 @@ async def test_purge_many_old_events(
) -> None: ) -> None:
"""Test deleting old events.""" """Test deleting old events."""
old_events_count = 5 old_events_count = 5
with patch.object(queries, "SQLITE_MAX_BIND_VARS", old_events_count), patch.object( instance = await async_setup_recorder_instance(hass)
purge, "SQLITE_MAX_BIND_VARS", old_events_count
):
instance = await async_setup_recorder_instance(hass)
with patch.object(instance, "max_bind_vars", old_events_count), patch.object(
instance.database_engine, "max_bind_vars", old_events_count
):
await _add_test_events(hass, old_events_count) await _add_test_events(hass, old_events_count)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:

View File

@ -13,10 +13,7 @@ from sqlalchemy.orm.session import Session
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import migration from homeassistant.components.recorder import migration
from homeassistant.components.recorder.const import ( from homeassistant.components.recorder.const import SupportedDialect
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from homeassistant.components.recorder.history import get_significant_states from homeassistant.components.recorder.history import get_significant_states
from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.services import ( from homeassistant.components.recorder.services import (
@ -631,7 +628,7 @@ async def test_purge_cutoff_date(
service_data = {"keep_days": 2} service_data = {"keep_days": 2}
# Force multiple purge batches to be run # Force multiple purge batches to be run
rows = SQLITE_MAX_BIND_VARS + 1 rows = 999
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
await _add_db_entries(hass, cutoff, rows) await _add_db_entries(hass, cutoff, rows)
@ -718,21 +715,22 @@ async def _add_test_states(hass: HomeAssistant):
await hass.async_block_till_done() await hass.async_block_till_done()
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
for event_id in range(6): with freeze_time() as freezer:
if event_id < 2: for event_id in range(6):
timestamp = eleven_days_ago if event_id < 2:
state = f"autopurgeme_{event_id}" timestamp = eleven_days_ago
attributes = {"autopurgeme": True, **base_attributes} state = f"autopurgeme_{event_id}"
elif event_id < 4: attributes = {"autopurgeme": True, **base_attributes}
timestamp = five_days_ago elif event_id < 4:
state = f"purgeme_{event_id}" timestamp = five_days_ago
attributes = {"purgeme": True, **base_attributes} state = f"purgeme_{event_id}"
else: attributes = {"purgeme": True, **base_attributes}
timestamp = utcnow else:
state = f"dontpurgeme_{event_id}" timestamp = utcnow
attributes = {"dontpurgeme": True, **base_attributes} state = f"dontpurgeme_{event_id}"
attributes = {"dontpurgeme": True, **base_attributes}
with freeze_time(timestamp): freezer.move_to(timestamp)
await set_state("test.recorder2", state, attributes=attributes) await set_state("test.recorder2", state, attributes=attributes)
@ -952,46 +950,50 @@ async def test_purge_many_old_events(
instance = await async_setup_recorder_instance(hass) instance = await async_setup_recorder_instance(hass)
await _async_attach_db_engine(hass) await _async_attach_db_engine(hass)
await _add_test_events(hass, SQLITE_MAX_BIND_VARS) old_events_count = 5
with patch.object(instance, "max_bind_vars", old_events_count), patch.object(
instance.database_engine, "max_bind_vars", old_events_count
):
await _add_test_events(hass, old_events_count)
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == SQLITE_MAX_BIND_VARS * 6 assert events.count() == old_events_count * 6
purge_before = dt_util.utcnow() - timedelta(days=4) purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data() # run purge_old_data()
finished = purge_old_data( finished = purge_old_data(
instance, instance,
purge_before, purge_before,
repack=False, repack=False,
states_batch_size=3, states_batch_size=3,
events_batch_size=3, events_batch_size=3,
) )
assert not finished assert not finished
assert events.count() == SQLITE_MAX_BIND_VARS * 3 assert events.count() == old_events_count * 3
# we should only have 2 groups of events left # we should only have 2 groups of events left
finished = purge_old_data( finished = purge_old_data(
instance, instance,
purge_before, purge_before,
repack=False, repack=False,
states_batch_size=3, states_batch_size=3,
events_batch_size=3, events_batch_size=3,
) )
assert finished assert finished
assert events.count() == SQLITE_MAX_BIND_VARS * 2 assert events.count() == old_events_count * 2
# we should now purge everything # we should now purge everything
finished = purge_old_data( finished = purge_old_data(
instance, instance,
dt_util.utcnow(), dt_util.utcnow(),
repack=False, repack=False,
states_batch_size=20, states_batch_size=20,
events_batch_size=20, events_batch_size=20,
) )
assert finished assert finished
assert events.count() == 0 assert events.count() == 0
async def test_purge_can_mix_legacy_and_new_format( async def test_purge_can_mix_legacy_and_new_format(