diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 662be41b1c8..95013de125d 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -34,6 +34,7 @@ from .queries import ( find_event_types_to_purge, find_events_to_purge, find_latest_statistics_runs_run_id, + find_legacy_detached_states_and_attributes_to_purge, find_legacy_event_state_and_attributes_and_data_ids_to_purge, find_legacy_row, find_short_term_statistics_to_purge, @@ -146,7 +147,28 @@ def _purge_legacy_format( _purge_unused_attributes_ids(instance, session, attributes_ids) _purge_event_ids(session, event_ids) _purge_unused_data_ids(instance, session, data_ids) - return bool(event_ids or state_ids or attributes_ids or data_ids) + + # The database may still have some rows that have an event_id but are not + # linked to any event. These rows are not linked to any event because the + # event was deleted. We need to purge these rows as well or we will never + # switch to the new format which will prevent us from purging any events + # that happened after the detached states. + ( + detached_state_ids, + detached_attributes_ids, + ) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( + session, purge_before + ) + _purge_state_ids(instance, session, detached_state_ids) + _purge_unused_attributes_ids(instance, session, detached_attributes_ids) + return bool( + event_ids + or state_ids + or attributes_ids + or data_ids + or detached_state_ids + or detached_attributes_ids + ) def _purge_states_and_attributes_ids( @@ -412,6 +434,31 @@ def _select_short_term_statistics_to_purge( return [statistic.id for statistic in statistics] +def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( + session: Session, purge_before: datetime +) -> tuple[set[int], set[int]]: + """Return a list of state, and attribute ids to purge. + + We do not link these anymore since state_change events + do not exist in the events table anymore, however we + still need to be able to purge them. + """ + states = session.execute( + find_legacy_detached_states_and_attributes_to_purge( + dt_util.utc_to_timestamp(purge_before) + ) + ).all() + _LOGGER.debug("Selected %s state ids to remove", len(states)) + state_ids = set() + attributes_ids = set() + for state in states: + if state_id := state.state_id: + state_ids.add(state_id) + if attributes_id := state.attributes_id: + attributes_ids.add(attributes_id) + return state_ids, attributes_ids + + def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( session: Session, purge_before: datetime ) -> tuple[set[int], set[int], set[int], set[int]]: @@ -433,12 +480,12 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( data_ids = set() for event in events: event_ids.add(event.event_id) - if event.state_id: - state_ids.add(event.state_id) - if event.attributes_id: - attributes_ids.add(event.attributes_id) - if event.data_id: - data_ids.add(event.data_id) + if state_id := event.state_id: + state_ids.add(state_id) + if attributes_id := event.attributes_id: + attributes_ids.add(attributes_id) + if data_id := event.data_id: + data_ids.add(data_id) return event_ids, state_ids, attributes_ids, data_ids diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index f8a1b769d87..49f66fdcd68 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -678,6 +678,22 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge( ) +def find_legacy_detached_states_and_attributes_to_purge( + purge_before: float, +) -> StatementLambdaElement: + """Find states rows with event_id set but not linked event_id in Events.""" + return lambda_stmt( + lambda: select(States.state_id, States.attributes_id) + .outerjoin(Events, States.event_id == Events.event_id) + .filter(States.event_id.isnot(None)) + .filter( + (States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None) + ) + .filter(Events.event_id.is_(None)) + .limit(SQLITE_MAX_BIND_VARS) + ) + + def find_legacy_row() -> StatementLambdaElement: """Check if there are still states in the table with an event_id.""" # https://github.com/sqlalchemy/sqlalchemy/issues/9189 diff --git a/tests/components/recorder/test_purge_v32_schema.py b/tests/components/recorder/test_purge_v32_schema.py index 433ff01eb91..613c17b3d39 100644 --- a/tests/components/recorder/test_purge_v32_schema.py +++ b/tests/components/recorder/test_purge_v32_schema.py @@ -8,6 +8,7 @@ from unittest.mock import MagicMock, patch from freezegun import freeze_time import pytest +from sqlalchemy import text, update from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session @@ -1000,7 +1001,7 @@ async def test_purge_many_old_events( async def test_purge_can_mix_legacy_and_new_format( async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant ) -> None: - """Test purging with legacy a new events.""" + """Test purging with legacy and new events.""" instance = await async_setup_recorder_instance(hass) await _async_attach_db_engine(hass) @@ -1018,6 +1019,7 @@ async def test_purge_can_mix_legacy_and_new_format( utcnow = dt_util.utcnow() eleven_days_ago = utcnow - timedelta(days=11) + with session_scope(hass=hass) as session: broken_state_no_time = States( event_id=None, @@ -1104,6 +1106,150 @@ async def test_purge_can_mix_legacy_and_new_format( assert states_without_event_id.count() == 1 +async def test_purge_can_mix_legacy_and_new_format_with_detached_state( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + recorder_db_url: str, +) -> None: + """Test purging with legacy and new events with a detached state.""" + if recorder_db_url.startswith(("mysql://", "postgresql://")): + return pytest.skip("This tests disables foreign key checks on SQLite") + + instance = await async_setup_recorder_instance(hass) + await _async_attach_db_engine(hass) + + await async_wait_recording_done(hass) + # New databases are no longer created with the legacy events index + assert instance.use_legacy_events_index is False + + def _recreate_legacy_events_index(): + """Recreate the legacy events index since its no longer created on new instances.""" + migration._create_index(instance.get_session, "states", "ix_states_event_id") + instance.use_legacy_events_index = True + + await instance.async_add_executor_job(_recreate_legacy_events_index) + assert instance.use_legacy_events_index is True + + with session_scope(hass=hass) as session: + session.execute(text("PRAGMA foreign_keys = OFF")) + + utcnow = dt_util.utcnow() + eleven_days_ago = utcnow - timedelta(days=11) + + with session_scope(hass=hass) as session: + broken_state_no_time = States( + event_id=None, + entity_id="orphened.state", + last_updated_ts=None, + last_changed_ts=None, + ) + session.add(broken_state_no_time) + detached_state_deleted_event_id = States( + event_id=99999999999, + entity_id="event.deleted", + last_updated_ts=1, + last_changed_ts=None, + ) + session.add(detached_state_deleted_event_id) + detached_state_deleted_event_id.last_changed = None + detached_state_deleted_event_id.last_changed_ts = None + detached_state_deleted_event_id.last_updated = None + detached_state_deleted_event_id = States( + event_id=99999999999, + entity_id="event.deleted.no_time", + last_updated_ts=None, + last_changed_ts=None, + ) + detached_state_deleted_event_id.last_changed = None + detached_state_deleted_event_id.last_changed_ts = None + detached_state_deleted_event_id.last_updated = None + detached_state_deleted_event_id.last_updated_ts = None + session.add(detached_state_deleted_event_id) + start_id = 50000 + for event_id in range(start_id, start_id + 50): + _add_state_and_state_changed_event( + session, + "sensor.excluded", + "purgeme", + eleven_days_ago, + event_id, + ) + with session_scope(hass=hass) as session: + session.execute( + update(States) + .where(States.entity_id == "event.deleted.no_time") + .values(last_updated_ts=None) + ) + + await _add_test_events(hass, 50) + await _add_events_with_event_data(hass, 50) + with session_scope(hass=hass) as session: + for _ in range(50): + _add_state_without_event_linkage( + session, "switch.random", "on", eleven_days_ago + ) + states_with_event_id = session.query(States).filter( + States.event_id.is_not(None) + ) + states_without_event_id = session.query(States).filter( + States.event_id.is_(None) + ) + + assert states_with_event_id.count() == 52 + assert states_without_event_id.count() == 51 + + purge_before = dt_util.utcnow() - timedelta(days=4) + finished = purge_old_data( + instance, + purge_before, + repack=False, + ) + assert not finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 51 + # At this point all the legacy states are gone + # and we switch methods + purge_before = dt_util.utcnow() - timedelta(days=4) + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=1, + states_batch_size=1, + ) + # Since we only allow one iteration, we won't + # check if we are finished this loop similar + # to the legacy method + assert not finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1 + finished = purge_old_data( + instance, + purge_before, + repack=False, + events_batch_size=100, + states_batch_size=100, + ) + assert finished + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1 + _add_state_without_event_linkage( + session, "switch.random", "on", eleven_days_ago + ) + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 2 + finished = purge_old_data( + instance, + purge_before, + repack=False, + ) + assert finished + # The broken state without a timestamp + # does not prevent future purges. Its ignored. + assert states_with_event_id.count() == 0 + assert states_without_event_id.count() == 1 + + async def test_purge_entities_keep_days( async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant,