Chunk purging attributes and data ids for old SQLite versions (#104296)

This commit is contained in:
J. Nick Koston 2023-11-24 11:46:02 +01:00 committed by GitHub
parent b41b56e54c
commit 9ed745638d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 24 deletions

View File

@ -41,7 +41,7 @@ from .queries import (
find_statistics_runs_to_purge,
)
from .repack import repack_database
from .util import chunked, retryable_database_job, session_scope
from .util import chunked_or_all, retryable_database_job, session_scope
if TYPE_CHECKING:
from . import Recorder
@ -283,12 +283,16 @@ def _select_event_data_ids_to_purge(
def _select_unused_attributes_ids(
session: Session, attributes_ids: set[int], database_engine: DatabaseEngine
instance: Recorder,
session: Session,
attributes_ids: set[int],
database_engine: DatabaseEngine,
) -> set[int]:
"""Return a set of attributes ids that are not used by any states in the db."""
if not attributes_ids:
return set()
seen_ids: set[int] = set()
if not database_engine.optimizer.slow_range_in_select:
#
# SQLite has a superior query optimizer for the distinct query below as it uses
@ -303,12 +307,17 @@ def _select_unused_attributes_ids(
# (136723);
# ...Using index
#
seen_ids = {
state[0]
for state in session.execute(
attributes_ids_exist_in_states_with_fast_in_distinct(attributes_ids)
).all()
}
for attributes_ids_chunk in chunked_or_all(
attributes_ids, instance.max_bind_vars
):
seen_ids.update(
state[0]
for state in session.execute(
attributes_ids_exist_in_states_with_fast_in_distinct(
attributes_ids_chunk
)
).all()
)
else:
#
# This branch is for DBMS that cannot optimize the distinct query well and has
@ -334,7 +343,6 @@ def _select_unused_attributes_ids(
# We now break the query into groups of 100 and use a lambda_stmt to ensure
# that the query is only cached once.
#
seen_ids = set()
groups = [iter(attributes_ids)] * 100
for attr_ids in zip_longest(*groups, fillvalue=None):
seen_ids |= {
@ -361,29 +369,33 @@ def _purge_unused_attributes_ids(
database_engine = instance.database_engine
assert database_engine is not None
if unused_attribute_ids_set := _select_unused_attributes_ids(
session, attributes_ids_batch, database_engine
instance, session, attributes_ids_batch, database_engine
):
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
def _select_unused_event_data_ids(
session: Session, data_ids: set[int], database_engine: DatabaseEngine
instance: Recorder,
session: Session,
data_ids: set[int],
database_engine: DatabaseEngine,
) -> set[int]:
"""Return a set of event data ids that are not used by any events in the db."""
if not data_ids:
return set()
seen_ids: set[int] = set()
# See _select_unused_attributes_ids for why this function
# branches for non-sqlite databases.
if not database_engine.optimizer.slow_range_in_select:
seen_ids = {
state[0]
for state in session.execute(
data_ids_exist_in_events_with_fast_in_distinct(data_ids)
).all()
}
for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars):
seen_ids.update(
state[0]
for state in session.execute(
data_ids_exist_in_events_with_fast_in_distinct(data_ids_chunk)
).all()
)
else:
seen_ids = set()
groups = [iter(data_ids)] * 100
for data_ids_group in zip_longest(*groups, fillvalue=None):
seen_ids |= {
@ -404,7 +416,7 @@ def _purge_unused_data_ids(
database_engine = instance.database_engine
assert database_engine is not None
if unused_data_ids_set := _select_unused_event_data_ids(
session, data_ids_batch, database_engine
instance, session, data_ids_batch, database_engine
):
_purge_batch_data_ids(instance, session, unused_data_ids_set)
@ -519,7 +531,7 @@ def _purge_batch_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids in batches of max_bind_vars."""
for attributes_ids_chunk in chunked(attributes_ids, instance.max_bind_vars):
for attributes_ids_chunk in chunked_or_all(attributes_ids, instance.max_bind_vars):
deleted_rows = session.execute(
delete_states_attributes_rows(attributes_ids_chunk)
)
@ -533,7 +545,7 @@ def _purge_batch_data_ids(
instance: Recorder, session: Session, data_ids: set[int]
) -> None:
"""Delete old event data ids in batches of max_bind_vars."""
for data_ids_chunk in chunked(data_ids, instance.max_bind_vars):
for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars):
deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
_LOGGER.debug("Deleted %s data events", deleted_rows)
@ -694,7 +706,10 @@ def _purge_filtered_states(
# we will need to purge them here.
_purge_event_ids(session, filtered_event_ids)
unused_attribute_ids_set = _select_unused_attributes_ids(
session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine
instance,
session,
{id_ for id_ in attributes_ids if id_ is not None},
database_engine,
)
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
return False
@ -741,7 +756,7 @@ def _purge_filtered_events(
_purge_state_ids(instance, session, state_ids)
_purge_event_ids(session, event_ids_set)
if unused_data_ids_set := _select_unused_event_data_ids(
session, set(data_ids), database_engine
instance, session, set(data_ids), database_engine
):
_purge_batch_data_ids(instance, session, unused_data_ids_set)
return False

View File

@ -1,7 +1,7 @@
"""SQLAlchemy util functions."""
from __future__ import annotations
from collections.abc import Callable, Generator, Iterable, Sequence
from collections.abc import Callable, Collection, Generator, Iterable, Sequence
from contextlib import contextmanager
from datetime import date, datetime, timedelta
import functools
@ -857,6 +857,20 @@ def chunked(iterable: Iterable, chunked_num: int) -> Iterable[Any]:
return iter(partial(take, chunked_num, iter(iterable)), [])
def chunked_or_all(iterable: Collection[Any], chunked_num: int) -> Iterable[Any]:
"""Break *collection* into iterables of length *n*.
Returns the collection if its length is less than *n*.
Unlike chunked, this function requires a collection so it can
determine the length of the collection and return the collection
if it is less than *n*.
"""
if len(iterable) <= chunked_num:
return (iterable,)
return chunked(iterable, chunked_num)
def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None:
"""Get an index by name."""
connection = session.connection()

View File

@ -25,6 +25,7 @@ from homeassistant.components.recorder.models import (
process_timestamp,
)
from homeassistant.components.recorder.util import (
chunked_or_all,
end_incomplete_runs,
is_second_sunday,
resolve_period,
@ -1023,3 +1024,24 @@ async def test_resolve_period(hass: HomeAssistant) -> None:
}
}
) == (now - timedelta(hours=1, minutes=25), now - timedelta(minutes=25))
def test_chunked_or_all():
"""Test chunked_or_all can iterate chunk sizes larger than the passed in collection."""
all = []
incoming = (1, 2, 3, 4)
for chunk in chunked_or_all(incoming, 2):
assert len(chunk) == 2
all.extend(chunk)
assert all == [1, 2, 3, 4]
all = []
incoming = (1, 2, 3, 4)
for chunk in chunked_or_all(incoming, 5):
assert len(chunk) == 4
# Verify the chunk is the same object as the incoming
# collection since we want to avoid copying the collection
# if we don't need to
assert chunk is incoming
all.extend(chunk)
assert all == [1, 2, 3, 4]