From 42c431762897c124310c2a02dc5bf20e099bf568 Mon Sep 17 00:00:00 2001 From: PeteBa Date: Fri, 25 Jun 2021 22:29:38 +0100 Subject: [PATCH] Avoid drift in recorder purge cut-off (#52135) --- homeassistant/components/recorder/__init__.py | 16 +-- homeassistant/components/recorder/purge.py | 9 +- tests/components/recorder/test_purge.py | 108 +++++++++++++++++- 3 files changed, 114 insertions(+), 19 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 0d6dddfa2d5..c16d7a2d198 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -306,7 +306,7 @@ def _async_register_services(hass, instance): class PurgeTask(NamedTuple): """Object to store information about purge task.""" - keep_days: int + purge_before: datetime repack: bool apply_filter: bool @@ -451,7 +451,8 @@ class Recorder(threading.Thread): repack = kwargs.get(ATTR_REPACK) apply_filter = kwargs.get(ATTR_APPLY_FILTER) - self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + purge_before = dt_util.utcnow() - timedelta(days=keep_days) + self.queue.put(PurgeTask(purge_before, repack, apply_filter)) def do_adhoc_purge_entities(self, entity_ids, domains, entity_globs): """Trigger an adhoc purge of requested entities.""" @@ -538,7 +539,8 @@ class Recorder(threading.Thread): # Purge will schedule the perodic cleanups # after it completes to ensure it does not happen # until after the database is vacuumed - self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False)) + purge_before = dt_util.utcnow() - timedelta(days=self.keep_days) + self.queue.put(PurgeTask(purge_before, repack=False, apply_filter=False)) else: self.queue.put(PerodicCleanupTask()) @@ -696,16 +698,16 @@ class Recorder(threading.Thread): self.migration_in_progress = False persistent_notification.dismiss(self.hass, "recorder_database_migration") - def _run_purge(self, keep_days, repack, apply_filter): + def _run_purge(self, purge_before, repack, apply_filter): """Purge the database.""" - if purge.purge_old_data(self, keep_days, repack, apply_filter): + if purge.purge_old_data(self, purge_before, repack, apply_filter): # We always need to do the db cleanups after a purge # is finished to ensure the WAL checkpoint and other # tasks happen after a vacuum. perodic_db_cleanups(self) return # Schedule a new purge task if this one didn't finish - self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + self.queue.put(PurgeTask(purge_before, repack, apply_filter)) def _run_purge_entities(self, entity_filter): """Purge entities from the database.""" @@ -724,7 +726,7 @@ class Recorder(threading.Thread): def _process_one_event(self, event): """Process one event.""" if isinstance(event, PurgeTask): - self._run_purge(event.keep_days, event.repack, event.apply_filter) + self._run_purge(event.purge_before, event.repack, event.apply_filter) return if isinstance(event, PurgeEntitiesTask): self._run_purge_entities(event.entity_filter) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index e1cf15e331d..49803117119 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -1,15 +1,13 @@ """Purge old data helper.""" from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime import logging from typing import TYPE_CHECKING, Callable from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct -import homeassistant.util.dt as dt_util - from .const import MAX_ROWS_TO_PURGE from .models import Events, RecorderRuns, States from .repack import repack_database @@ -23,13 +21,12 @@ _LOGGER = logging.getLogger(__name__) @retryable_database_job("purge") def purge_old_data( - instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False + instance: Recorder, purge_before: datetime, repack: bool, apply_filter: bool = False ) -> bool: - """Purge events and states older than purge_days ago. + """Purge events and states older than purge_before. Cleans up an timeframe of an hour, based on the oldest record. """ - purge_before = dt_util.utcnow() - timedelta(days=purge_days) _LOGGER.debug( "Purging states and events before target %s", purge_before.isoformat(sep=" ", timespec="seconds"), diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 6727b4da495..40ad71096c1 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -8,6 +8,8 @@ from sqlalchemy.exc import DatabaseError, OperationalError from sqlalchemy.orm.session import Session from homeassistant.components import recorder +from homeassistant.components.recorder import PurgeTask +from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.util import session_scope @@ -43,8 +45,10 @@ async def test_purge_old_states( events = session.query(Events).filter(Events.event_type == "state_changed") assert events.count() == 6 + purge_before = dt_util.utcnow() - timedelta(days=4) + # run purge_old_data() - finished = purge_old_data(instance, 4, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert not finished assert states.count() == 2 @@ -52,7 +56,7 @@ async def test_purge_old_states( assert states_after_purge[1].old_state_id == states_after_purge[0].state_id assert states_after_purge[0].old_state_id is None - finished = purge_old_data(instance, 4, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert finished assert states.count() == 2 @@ -162,13 +166,15 @@ async def test_purge_old_events( events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) assert events.count() == 6 + purge_before = dt_util.utcnow() - timedelta(days=4) + # run purge_old_data() - finished = purge_old_data(instance, 4, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert not finished assert events.count() == 2 # we should only have 2 events left - finished = purge_old_data(instance, 4, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert finished assert events.count() == 2 @@ -186,11 +192,13 @@ async def test_purge_old_recorder_runs( recorder_runs = session.query(RecorderRuns) assert recorder_runs.count() == 7 + purge_before = dt_util.utcnow() + # run purge_old_data() - finished = purge_old_data(instance, 0, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert not finished - finished = purge_old_data(instance, 0, repack=False) + finished = purge_old_data(instance, purge_before, repack=False) assert finished assert recorder_runs.count() == 1 @@ -322,6 +330,94 @@ async def test_purge_edge_case( assert events.count() == 0 +async def test_purge_cutoff_date( + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, +): + """Test states and events are purged only if they occurred before "now() - keep_days".""" + + async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> None: + timestamp_keep = cutoff + timestamp_purge = cutoff - timedelta(microseconds=1) + + with recorder.session_scope(hass=hass) as session: + session.add( + Events( + event_id=1000, + event_type="KEEP", + event_data="{}", + origin="LOCAL", + created=timestamp_keep, + time_fired=timestamp_keep, + ) + ) + session.add( + States( + entity_id="test.cutoff", + domain="sensor", + state="keep", + attributes="{}", + last_changed=timestamp_keep, + last_updated=timestamp_keep, + created=timestamp_keep, + event_id=1000, + ) + ) + for row in range(1, rows): + session.add( + Events( + event_id=1000 + row, + event_type="PURGE", + event_data="{}", + origin="LOCAL", + created=timestamp_purge, + time_fired=timestamp_purge, + ) + ) + session.add( + States( + entity_id="test.cutoff", + domain="sensor", + state="purge", + attributes="{}", + last_changed=timestamp_purge, + last_updated=timestamp_purge, + created=timestamp_purge, + event_id=1000 + row, + ) + ) + + instance = await async_setup_recorder_instance(hass, None) + await async_wait_purge_done(hass, instance) + + service_data = {"keep_days": 2} + + # Force multiple purge batches to be run + rows = MAX_ROWS_TO_PURGE + 1 + cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) + await _add_db_entries(hass, cutoff, rows) + + with session_scope(hass=hass) as session: + states = session.query(States) + events = session.query(Events) + assert states.filter(States.state == "purge").count() == rows - 1 + assert states.filter(States.state == "keep").count() == 1 + assert events.filter(Events.event_type == "PURGE").count() == rows - 1 + assert events.filter(Events.event_type == "KEEP").count() == 1 + + instance.queue.put(PurgeTask(cutoff, repack=False, apply_filter=False)) + await hass.async_block_till_done() + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + + states = session.query(States) + events = session.query(Events) + assert states.filter(States.state == "purge").count() == 0 + assert states.filter(States.state == "keep").count() == 1 + assert events.filter(Events.event_type == "PURGE").count() == 0 + assert events.filter(Events.event_type == "KEEP").count() == 1 + + async def test_purge_filtered_states( hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT,