From c2b770bcb944ed4c52f9139a76844bede95a3409 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 19 Feb 2023 20:26:38 -0600 Subject: [PATCH] Load pending state attributes and event data ids at startup (#88444) * Load pending state attributes and event data ids at startup Since we queue all events to be processed after startup we can have a thundering herd of queries to prime the LRUs of event data and state attributes ids. Since we know we are about to process a chunk of events we can fetch all the ids in two queries * lru * fix hang * Fix recorder LRU being destroyed if event session is reopened We would clear the LRU in _close_event_session but it would never get replaced with an LRU again so it would leak memory if the event session is reopened * Fix recorder LRU being destroyed if event session is reopened We would clear the LRU in _close_event_session but it would never get replaced with an LRU again so it would leak memory if the event session is reopened * cleanup --- homeassistant/components/recorder/const.py | 2 +- homeassistant/components/recorder/core.py | 129 +++++++++++++++--- .../components/recorder/db_schema.py | 3 + homeassistant/components/recorder/purge.py | 47 ++----- homeassistant/components/recorder/queries.py | 30 +++- .../components/recorder/statistics.py | 16 ++- homeassistant/components/recorder/util.py | 20 ++- tests/components/recorder/test_migrate.py | 2 + tests/components/recorder/test_purge.py | 15 +- 9 files changed, 190 insertions(+), 74 deletions(-) diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index 379185bec7b..effecf15a8b 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -27,7 +27,7 @@ MAX_QUEUE_BACKLOG = 65000 # in https://github.com/sqlite/sqlite/commit/efdba1a8b3c6c967e7fae9c1989c40d420ce64cc # We can increase this back to 1000 once most # have upgraded their sqlite version -MAX_ROWS_TO_PURGE = 998 +SQLITE_MAX_BIND_VARS = 998 DB_WORKER_PREFIX = "DbWorker" diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 8cae7c5eadd..4e099a3b17f 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -52,6 +52,7 @@ from .const import ( MAX_QUEUE_BACKLOG, MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_URL_PREFIX, + SQLITE_MAX_BIND_VARS, SQLITE_URL_PREFIX, SupportedDialect, ) @@ -75,7 +76,12 @@ from .models import ( process_timestamp, ) from .pool import POOL_SIZE, MutexPool, RecorderPool -from .queries import find_shared_attributes_id, find_shared_data_id +from .queries import ( + find_shared_attributes_id, + find_shared_data_id, + get_shared_attributes, + get_shared_event_datas, +) from .run_history import RunHistory from .tasks import ( AdjustLRUSizeTask, @@ -98,6 +104,7 @@ from .tasks import ( ) from .util import ( build_mysqldb_conv, + chunked, dburl_to_path, end_incomplete_runs, is_second_sunday, @@ -681,22 +688,93 @@ class Recorder(threading.Thread): self._adjust_lru_size() self.hass.add_job(self._async_set_recorder_ready_migration_done) self._run_event_loop() + self._shutdown() def _run_event_loop(self) -> None: """Run the event loop for the recorder.""" # Use a session for the event read loop # with a commit every time the event time # has changed. This reduces the disk io. + queue_ = self._queue + startup_tasks: list[RecorderTask] = [] + while not queue_.empty() and (task := queue_.get_nowait()): + startup_tasks.append(task) + self._pre_process_startup_tasks(startup_tasks) + for task in startup_tasks: + self._guarded_process_one_task_or_recover(task) + self.stop_requested = False while not self.stop_requested: - task = self._queue.get() - _LOGGER.debug("Processing task: %s", task) - try: - self._process_one_task_or_recover(task) - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception("Error while processing event %s: %s", task, err) + self._guarded_process_one_task_or_recover(queue_.get()) - self._shutdown() + def _pre_process_startup_tasks(self, startup_tasks: list[RecorderTask]) -> None: + """Pre process startup tasks.""" + # Prime all the state_attributes and event_data caches + # before we start processing events + state_change_events: list[Event] = [] + non_state_change_events: list[Event] = [] + + for task in startup_tasks: + if isinstance(task, EventTask): + event_ = task.event + if event_.event_type == EVENT_STATE_CHANGED: + state_change_events.append(event_) + else: + non_state_change_events.append(event_) + + self._pre_process_state_change_events(state_change_events) + self._pre_process_non_state_change_events(non_state_change_events) + + def _pre_process_state_change_events(self, events: list[Event]) -> None: + """Load startup state attributes from the database. + + Since the _state_attributes_ids cache is empty at startup + we restore it from the database to avoid having to look up + the attributes in the database for every state change + until its primed. + """ + assert self.event_session is not None + if hashes := [ + StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes) + for event in events + if ( + shared_attrs_bytes := self._serialize_state_attributes_from_event(event) + ) + ]: + with self.event_session.no_autoflush: + for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): + for id_, shared_attrs in self.event_session.execute( + get_shared_attributes(hash_chunk) + ).fetchall(): + self._state_attributes_ids[shared_attrs] = id_ + + def _pre_process_non_state_change_events(self, events: list[Event]) -> None: + """Load startup event attributes from the database. + + Since the _event_data_ids cache is empty at startup + we restore it from the database to avoid having to look up + the data in the database for every event until its primed. + """ + assert self.event_session is not None + if hashes := [ + EventData.hash_shared_data_bytes(shared_event_bytes) + for event in events + if (shared_event_bytes := self._serialize_event_data_from_event(event)) + ]: + with self.event_session.no_autoflush: + for hash_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): + for id_, shared_data in self.event_session.execute( + get_shared_event_datas(hash_chunk) + ).fetchall(): + self._event_data_ids[shared_data] = id_ + + def _guarded_process_one_task_or_recover(self, task: RecorderTask) -> None: + """Process a task, guarding against exceptions to ensure the loop does not collapse.""" + _LOGGER.debug("Processing task: %s", task) + try: + self._process_one_task_or_recover(task) + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error while processing event %s: %s", task, err) def _process_one_task_or_recover(self, task: RecorderTask) -> None: """Process an event, reconnect, or recover a malformed database.""" @@ -854,6 +932,14 @@ class Recorder(threading.Thread): return cast(int, data_id[0]) return None + def _serialize_event_data_from_event(self, event: Event) -> bytes | None: + """Serialize event data.""" + try: + return EventData.shared_data_bytes_from_event(event, self.dialect_name) + except JSON_ENCODE_EXCEPTIONS as ex: + _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) + return None + def _process_non_state_changed_event_into_session(self, event: Event) -> None: """Process any event into the session except state changed.""" assert self.event_session is not None @@ -861,15 +947,8 @@ class Recorder(threading.Thread): if not event.data: self.event_session.add(dbevent) return - - try: - shared_data_bytes = EventData.shared_data_bytes_from_event( - event, self.dialect_name - ) - except JSON_ENCODE_EXCEPTIONS as ex: - _LOGGER.warning("Event is not JSON serializable: %s: %s", event, ex) + if not (shared_data_bytes := self._serialize_event_data_from_event(event)): return - shared_data = shared_data_bytes.decode("utf-8") # Matching attributes found in the pending commit if pending_event_data := self._pending_event_data.get(shared_data): @@ -892,12 +971,10 @@ class Recorder(threading.Thread): self.event_session.add(dbevent) - def _process_state_changed_event_into_session(self, event: Event) -> None: - """Process a state_changed event into the session.""" - assert self.event_session is not None + def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None: + """Serialize state changed event data.""" try: - dbstate = States.from_event(event) - shared_attrs_bytes = StateAttributes.shared_attrs_bytes_from_event( + return StateAttributes.shared_attrs_bytes_from_event( event, self._entity_sources, self._exclude_attributes_by_domain, @@ -909,6 +986,15 @@ class Recorder(threading.Thread): event.data.get("new_state"), ex, ) + return None + + def _process_state_changed_event_into_session(self, event: Event) -> None: + """Process a state_changed event into the session.""" + assert self.event_session is not None + dbstate = States.from_event(event) + if not ( + shared_attrs_bytes := self._serialize_state_attributes_from_event(event) + ): return shared_attrs = shared_attrs_bytes.decode("utf-8") @@ -1256,6 +1342,7 @@ class Recorder(threading.Thread): def _shutdown(self) -> None: """Save end time for current run.""" + _LOGGER.debug("Shutting down recorder") self.hass.add_job(self._async_stop_listeners) self._stop_executor() try: diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 0c3de80c5f3..3bdbb0d3c28 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Callable from datetime import datetime, timedelta +from functools import lru_cache import logging import time from typing import Any, cast @@ -284,6 +285,7 @@ class EventData(Base): return json_bytes(event.data) @staticmethod + @lru_cache def hash_shared_data_bytes(shared_data_bytes: bytes) -> int: """Return the hash of json encoded shared data.""" return cast(int, fnv1a_32(shared_data_bytes)) @@ -492,6 +494,7 @@ class StateAttributes(Base): return bytes_result @staticmethod + @lru_cache(maxsize=2048) def hash_shared_attrs_bytes(shared_attrs_bytes: bytes) -> int: """Return the hash of json encoded shared attributes.""" return cast(int, fnv1a_32(shared_attrs_bytes)) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index eb6413fa786..7ae63ef026b 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -3,10 +3,9 @@ from __future__ import annotations from collections.abc import Callable, Iterable from datetime import datetime -from functools import partial -from itertools import islice, zip_longest +from itertools import zip_longest import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from sqlalchemy.engine.row import Row from sqlalchemy.orm.session import Session @@ -15,7 +14,7 @@ from sqlalchemy.sql.expression import distinct from homeassistant.const import EVENT_STATE_CHANGED import homeassistant.util.dt as dt_util -from .const import MAX_ROWS_TO_PURGE +from .const import SQLITE_MAX_BIND_VARS from .db_schema import Events, StateAttributes, States from .models import DatabaseEngine from .queries import ( @@ -40,7 +39,7 @@ from .queries import ( find_statistics_runs_to_purge, ) from .repack import repack_database -from .util import retryable_database_job, session_scope +from .util import chunked, retryable_database_job, session_scope if TYPE_CHECKING: from . import Recorder @@ -52,22 +51,6 @@ DEFAULT_STATES_BATCHES_PER_PURGE = 20 # We expect ~95% de-dupe rate DEFAULT_EVENTS_BATCHES_PER_PURGE = 15 # We expect ~92% de-dupe rate -def take(take_num: int, iterable: Iterable) -> list[Any]: - """Return first n items of the iterable as a list. - - From itertools recipes - """ - return list(islice(iterable, take_num)) - - -def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]: - """Break *iterable* into lists of length *n*. - - From more-itertools - """ - return iter(partial(take, chunked_num, iter(iterable)), []) - - @retryable_database_job("purge") def purge_old_data( instance: Recorder, @@ -86,7 +69,7 @@ def purge_old_data( purge_before.isoformat(sep=" ", timespec="seconds"), ) with session_scope(session=instance.get_session()) as session: - # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record + # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record has_more_to_purge = False if _purging_legacy_format(session): _LOGGER.debug( @@ -174,7 +157,7 @@ def _purge_states_and_attributes_ids( # There are more states relative to attributes_ids so # we purge enough state_ids to try to generate a full # size batch of attributes_ids that will be around the size - # MAX_ROWS_TO_PURGE + # SQLITE_MAX_BIND_VARS attributes_ids_batch: set[int] = set() for _ in range(states_batch_size): state_ids, attributes_ids = _select_state_attributes_ids_to_purge( @@ -208,7 +191,7 @@ def _purge_events_and_data_ids( # There are more events relative to data_ids so # we purge enough event_ids to try to generate a full # size batch of data_ids that will be around the size - # MAX_ROWS_TO_PURGE + # SQLITE_MAX_BIND_VARS data_ids_batch: set[int] = set() for _ in range(events_batch_size): event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before) @@ -310,7 +293,7 @@ def _select_unused_attributes_ids( # # We used to generate a query based on how many attribute_ids to find but # that meant sqlalchemy Transparent SQL Compilation Caching was working against - # us by cached up to MAX_ROWS_TO_PURGE different statements which could be + # us by cached up to SQLITE_MAX_BIND_VARS different statements which could be # up to 500MB for large database due to the complexity of the ORM objects. # # We now break the query into groups of 100 and use a lambda_stmt to ensure @@ -525,8 +508,8 @@ def _evict_purged_attributes_from_attributes_cache( def _purge_batch_attributes_ids( instance: Recorder, session: Session, attributes_ids: set[int] ) -> None: - """Delete old attributes ids in batches of MAX_ROWS_TO_PURGE.""" - for attributes_ids_chunk in chunked(attributes_ids, MAX_ROWS_TO_PURGE): + """Delete old attributes ids in batches of SQLITE_MAX_BIND_VARS.""" + for attributes_ids_chunk in chunked(attributes_ids, SQLITE_MAX_BIND_VARS): deleted_rows = session.execute( delete_states_attributes_rows(attributes_ids_chunk) ) @@ -539,8 +522,8 @@ def _purge_batch_attributes_ids( def _purge_batch_data_ids( instance: Recorder, session: Session, data_ids: set[int] ) -> None: - """Delete old event data ids in batches of MAX_ROWS_TO_PURGE.""" - for data_ids_chunk in chunked(data_ids, MAX_ROWS_TO_PURGE): + """Delete old event data ids in batches of SQLITE_MAX_BIND_VARS.""" + for data_ids_chunk in chunked(data_ids, SQLITE_MAX_BIND_VARS): deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk)) _LOGGER.debug("Deleted %s data events", deleted_rows) @@ -624,7 +607,7 @@ def _purge_filtered_states( *( session.query(States.state_id, States.attributes_id, States.event_id) .filter(States.entity_id.in_(excluded_entity_ids)) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) .all() ) ) @@ -650,7 +633,7 @@ def _purge_filtered_events( *( session.query(Events.event_id, Events.data_id) .filter(Events.event_type.in_(excluded_event_types)) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) .all() ) ) @@ -685,7 +668,7 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) ] _LOGGER.debug("Purging entity data for %s", selected_entity_ids) if len(selected_entity_ids) > 0: - # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states + # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states # or events record. _purge_filtered_states( instance, session, selected_entity_ids, database_engine diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index d12b6409b7c..d93a6b0d62b 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -8,7 +8,7 @@ from sqlalchemy import delete, distinct, func, lambda_stmt, select, union_all, u from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.selectable import Select -from .const import MAX_ROWS_TO_PURGE +from .const import SQLITE_MAX_BIND_VARS from .db_schema import ( EventData, Events, @@ -20,6 +20,24 @@ from .db_schema import ( ) +def get_shared_attributes(hashes: list[int]) -> StatementLambdaElement: + """Load shared attributes from the database.""" + return lambda_stmt( + lambda: select( + StateAttributes.attributes_id, StateAttributes.shared_attrs + ).where(StateAttributes.hash.in_(hashes)) + ) + + +def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement: + """Load shared event data from the database.""" + return lambda_stmt( + lambda: select(EventData.data_id, EventData.shared_data).where( + EventData.hash.in_(hashes) + ) + ) + + def find_shared_attributes_id( data_hash: int, shared_attrs: str ) -> StatementLambdaElement: @@ -587,7 +605,7 @@ def find_events_to_purge(purge_before: float) -> StatementLambdaElement: return lambda_stmt( lambda: select(Events.event_id, Events.data_id) .filter(Events.time_fired_ts < purge_before) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) ) @@ -596,7 +614,7 @@ def find_states_to_purge(purge_before: float) -> StatementLambdaElement: return lambda_stmt( lambda: select(States.state_id, States.attributes_id) .filter(States.last_updated_ts < purge_before) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) ) @@ -608,7 +626,7 @@ def find_short_term_statistics_to_purge( return lambda_stmt( lambda: select(StatisticsShortTerm.id) .filter(StatisticsShortTerm.start_ts < purge_before_ts) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) ) @@ -619,7 +637,7 @@ def find_statistics_runs_to_purge( return lambda_stmt( lambda: select(StatisticsRuns.run_id) .filter(StatisticsRuns.start < purge_before) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) ) @@ -640,7 +658,7 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge( ) .outerjoin(States, Events.event_id == States.event_id) .filter(Events.time_fired_ts < purge_before) - .limit(MAX_ROWS_TO_PURGE) + .limit(SQLITE_MAX_BIND_VARS) ) diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index e0d02e8a0e3..fece82b4ff6 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -56,7 +56,7 @@ from .const import ( DOMAIN, EVENT_RECORDER_5MIN_STATISTICS_GENERATED, EVENT_RECORDER_HOURLY_STATISTICS_GENERATED, - MAX_ROWS_TO_PURGE, + SQLITE_MAX_BIND_VARS, SupportedDialect, ) from .db_schema import ( @@ -441,7 +441,7 @@ def _find_duplicates( ) .filter(subquery.c.is_duplicate == 1) .order_by(table.metadata_id, table.start, table.id.desc()) - .limit(1000 * MAX_ROWS_TO_PURGE) + .limit(1000 * SQLITE_MAX_BIND_VARS) ) duplicates = execute(query) original_as_dict = {} @@ -505,10 +505,10 @@ def _delete_duplicates_from_table( if not duplicate_ids: break all_non_identical_duplicates.extend(non_identical_duplicates) - for i in range(0, len(duplicate_ids), MAX_ROWS_TO_PURGE): + for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): deleted_rows = ( session.query(table) - .filter(table.id.in_(duplicate_ids[i : i + MAX_ROWS_TO_PURGE])) + .filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])) .delete(synchronize_session=False) ) total_deleted_rows += deleted_rows @@ -584,7 +584,7 @@ def _find_statistics_meta_duplicates(session: Session) -> list[int]: ) .filter(subquery.c.is_duplicate == 1) .order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc()) - .limit(1000 * MAX_ROWS_TO_PURGE) + .limit(1000 * SQLITE_MAX_BIND_VARS) ) duplicates = execute(query) statistic_id = None @@ -609,10 +609,12 @@ def _delete_statistics_meta_duplicates(session: Session) -> int: duplicate_ids = _find_statistics_meta_duplicates(session) if not duplicate_ids: break - for i in range(0, len(duplicate_ids), MAX_ROWS_TO_PURGE): + for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): deleted_rows = ( session.query(StatisticsMeta) - .filter(StatisticsMeta.id.in_(duplicate_ids[i : i + MAX_ROWS_TO_PURGE])) + .filter( + StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]) + ) .delete(synchronize_session=False) ) total_deleted_rows += deleted_rows diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index 4db92030091..5cda3d283dd 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -1,10 +1,12 @@ """SQLAlchemy util functions.""" from __future__ import annotations -from collections.abc import Callable, Generator, Sequence +from collections.abc import Callable, Generator, Iterable, Sequence from contextlib import contextmanager from datetime import date, datetime, timedelta import functools +from functools import partial +from itertools import islice import logging import os import time @@ -761,3 +763,19 @@ def resolve_period( end_time += offset return (start_time, end_time) + + +def take(take_num: int, iterable: Iterable) -> list[Any]: + """Return first n items of the iterable as a list. + + From itertools recipes + """ + return list(islice(iterable, take_num)) + + +def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]: + """Break *iterable* into lists of length *n*. + + From more-itertools + """ + return iter(partial(take, chunked_num, iter(iterable)), []) diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index fce8bc38515..44c3ffac99e 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -374,6 +374,8 @@ async def test_schema_migrate( "homeassistant.components.recorder.Recorder._process_state_changed_event_into_session", ), patch( "homeassistant.components.recorder.Recorder._process_non_state_changed_event_into_session", + ), patch( + "homeassistant.components.recorder.Recorder._pre_process_startup_tasks", ): recorder_helper.async_initialize_recorder(hass) hass.async_create_task( diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index a56bcb265a7..c5ce8d272c7 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -9,7 +9,10 @@ from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session from homeassistant.components import recorder -from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE, SupportedDialect +from homeassistant.components.recorder.const import ( + SQLITE_MAX_BIND_VARS, + SupportedDialect, +) from homeassistant.components.recorder.db_schema import ( EventData, Events, @@ -591,7 +594,7 @@ async def test_purge_cutoff_date( service_data = {"keep_days": 2} # Force multiple purge batches to be run - rows = MAX_ROWS_TO_PURGE + 1 + rows = SQLITE_MAX_BIND_VARS + 1 cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) await _add_db_entries(hass, cutoff, rows) @@ -1548,11 +1551,11 @@ async def test_purge_many_old_events( """Test deleting old events.""" instance = await async_setup_recorder_instance(hass) - await _add_test_events(hass, MAX_ROWS_TO_PURGE) + await _add_test_events(hass, SQLITE_MAX_BIND_VARS) with session_scope(hass=hass) as session: events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) - assert events.count() == MAX_ROWS_TO_PURGE * 6 + assert events.count() == SQLITE_MAX_BIND_VARS * 6 purge_before = dt_util.utcnow() - timedelta(days=4) @@ -1565,7 +1568,7 @@ async def test_purge_many_old_events( events_batch_size=3, ) assert not finished - assert events.count() == MAX_ROWS_TO_PURGE * 3 + assert events.count() == SQLITE_MAX_BIND_VARS * 3 # we should only have 2 groups of events left finished = purge_old_data( @@ -1576,7 +1579,7 @@ async def test_purge_many_old_events( events_batch_size=3, ) assert finished - assert events.count() == MAX_ROWS_TO_PURGE * 2 + assert events.count() == SQLITE_MAX_BIND_VARS * 2 # we should now purge everything finished = purge_old_data(