diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 8bc6584c5a1..8dd539f84f3 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -41,7 +41,7 @@ from .queries import ( find_statistics_runs_to_purge, ) from .repack import repack_database -from .util import chunked, retryable_database_job, session_scope +from .util import chunked_or_all, retryable_database_job, session_scope if TYPE_CHECKING: from . import Recorder @@ -283,12 +283,16 @@ def _select_event_data_ids_to_purge( def _select_unused_attributes_ids( - session: Session, attributes_ids: set[int], database_engine: DatabaseEngine + instance: Recorder, + session: Session, + attributes_ids: set[int], + database_engine: DatabaseEngine, ) -> set[int]: """Return a set of attributes ids that are not used by any states in the db.""" if not attributes_ids: return set() + seen_ids: set[int] = set() if not database_engine.optimizer.slow_range_in_select: # # SQLite has a superior query optimizer for the distinct query below as it uses @@ -303,12 +307,17 @@ def _select_unused_attributes_ids( # (136723); # ...Using index # - seen_ids = { - state[0] - for state in session.execute( - attributes_ids_exist_in_states_with_fast_in_distinct(attributes_ids) - ).all() - } + for attributes_ids_chunk in chunked_or_all( + attributes_ids, instance.max_bind_vars + ): + seen_ids.update( + state[0] + for state in session.execute( + attributes_ids_exist_in_states_with_fast_in_distinct( + attributes_ids_chunk + ) + ).all() + ) else: # # This branch is for DBMS that cannot optimize the distinct query well and has @@ -334,7 +343,6 @@ def _select_unused_attributes_ids( # We now break the query into groups of 100 and use a lambda_stmt to ensure # that the query is only cached once. # - seen_ids = set() groups = [iter(attributes_ids)] * 100 for attr_ids in zip_longest(*groups, fillvalue=None): seen_ids |= { @@ -361,29 +369,33 @@ def _purge_unused_attributes_ids( database_engine = instance.database_engine assert database_engine is not None if unused_attribute_ids_set := _select_unused_attributes_ids( - session, attributes_ids_batch, database_engine + instance, session, attributes_ids_batch, database_engine ): _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) def _select_unused_event_data_ids( - session: Session, data_ids: set[int], database_engine: DatabaseEngine + instance: Recorder, + session: Session, + data_ids: set[int], + database_engine: DatabaseEngine, ) -> set[int]: """Return a set of event data ids that are not used by any events in the db.""" if not data_ids: return set() + seen_ids: set[int] = set() # See _select_unused_attributes_ids for why this function # branches for non-sqlite databases. if not database_engine.optimizer.slow_range_in_select: - seen_ids = { - state[0] - for state in session.execute( - data_ids_exist_in_events_with_fast_in_distinct(data_ids) - ).all() - } + for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars): + seen_ids.update( + state[0] + for state in session.execute( + data_ids_exist_in_events_with_fast_in_distinct(data_ids_chunk) + ).all() + ) else: - seen_ids = set() groups = [iter(data_ids)] * 100 for data_ids_group in zip_longest(*groups, fillvalue=None): seen_ids |= { @@ -404,7 +416,7 @@ def _purge_unused_data_ids( database_engine = instance.database_engine assert database_engine is not None if unused_data_ids_set := _select_unused_event_data_ids( - session, data_ids_batch, database_engine + instance, session, data_ids_batch, database_engine ): _purge_batch_data_ids(instance, session, unused_data_ids_set) @@ -519,7 +531,7 @@ def _purge_batch_attributes_ids( instance: Recorder, session: Session, attributes_ids: set[int] ) -> None: """Delete old attributes ids in batches of max_bind_vars.""" - for attributes_ids_chunk in chunked(attributes_ids, instance.max_bind_vars): + for attributes_ids_chunk in chunked_or_all(attributes_ids, instance.max_bind_vars): deleted_rows = session.execute( delete_states_attributes_rows(attributes_ids_chunk) ) @@ -533,7 +545,7 @@ def _purge_batch_data_ids( instance: Recorder, session: Session, data_ids: set[int] ) -> None: """Delete old event data ids in batches of max_bind_vars.""" - for data_ids_chunk in chunked(data_ids, instance.max_bind_vars): + for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars): deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk)) _LOGGER.debug("Deleted %s data events", deleted_rows) @@ -694,7 +706,10 @@ def _purge_filtered_states( # we will need to purge them here. _purge_event_ids(session, filtered_event_ids) unused_attribute_ids_set = _select_unused_attributes_ids( - session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine + instance, + session, + {id_ for id_ in attributes_ids if id_ is not None}, + database_engine, ) _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) return False @@ -741,7 +756,7 @@ def _purge_filtered_events( _purge_state_ids(instance, session, state_ids) _purge_event_ids(session, event_ids_set) if unused_data_ids_set := _select_unused_event_data_ids( - session, set(data_ids), database_engine + instance, session, set(data_ids), database_engine ): _purge_batch_data_ids(instance, session, unused_data_ids_set) return False diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index f94601bb2cb..2d518d8874b 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -1,7 +1,7 @@ """SQLAlchemy util functions.""" from __future__ import annotations -from collections.abc import Callable, Generator, Iterable, Sequence +from collections.abc import Callable, Collection, Generator, Iterable, Sequence from contextlib import contextmanager from datetime import date, datetime, timedelta import functools @@ -857,6 +857,20 @@ def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]: return iter(partial(take, chunked_num, iter(iterable)), []) +def chunked_or_all(iterable: Collection[Any], chunked_num: int) -> Iterable[Any]: + """Break *collection* into iterables of length *n*. + + Returns the collection if its length is less than *n*. + + Unlike chunked, this function requires a collection so it can + determine the length of the collection and return the collection + if it is less than *n*. + """ + if len(iterable) <= chunked_num: + return (iterable,) + return chunked(iterable, chunked_num) + + def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None: """Get an index by name.""" connection = session.connection() diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index a7b15a7f12d..0a30895adc9 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -25,6 +25,7 @@ from homeassistant.components.recorder.models import ( process_timestamp, ) from homeassistant.components.recorder.util import ( + chunked_or_all, end_incomplete_runs, is_second_sunday, resolve_period, @@ -1023,3 +1024,24 @@ async def test_resolve_period(hass: HomeAssistant) -> None: } } ) == (now - timedelta(hours=1, minutes=25), now - timedelta(minutes=25)) + + +def test_chunked_or_all(): + """Test chunked_or_all can iterate chunk sizes larger than the passed in collection.""" + all = [] + incoming = (1, 2, 3, 4) + for chunk in chunked_or_all(incoming, 2): + assert len(chunk) == 2 + all.extend(chunk) + assert all == [1, 2, 3, 4] + + all = [] + incoming = (1, 2, 3, 4) + for chunk in chunked_or_all(incoming, 5): + assert len(chunk) == 4 + # Verify the chunk is the same object as the incoming + # collection since we want to avoid copying the collection + # if we don't need to + assert chunk is incoming + all.extend(chunk) + assert all == [1, 2, 3, 4]