Avoid drift in recorder purge cut-off (#52135)

This commit is contained in:
PeteBa 2021-06-25 22:29:38 +01:00 committed by GitHub
parent a71af8e9d3
commit 42c4317628
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 114 additions and 19 deletions

View File

@ -306,7 +306,7 @@ def _async_register_services(hass, instance):
class PurgeTask(NamedTuple): class PurgeTask(NamedTuple):
"""Object to store information about purge task.""" """Object to store information about purge task."""
keep_days: int purge_before: datetime
repack: bool repack: bool
apply_filter: bool apply_filter: bool
@ -451,7 +451,8 @@ class Recorder(threading.Thread):
repack = kwargs.get(ATTR_REPACK) repack = kwargs.get(ATTR_REPACK)
apply_filter = kwargs.get(ATTR_APPLY_FILTER) apply_filter = kwargs.get(ATTR_APPLY_FILTER)
self.queue.put(PurgeTask(keep_days, repack, apply_filter)) purge_before = dt_util.utcnow() - timedelta(days=keep_days)
self.queue.put(PurgeTask(purge_before, repack, apply_filter))
def do_adhoc_purge_entities(self, entity_ids, domains, entity_globs): def do_adhoc_purge_entities(self, entity_ids, domains, entity_globs):
"""Trigger an adhoc purge of requested entities.""" """Trigger an adhoc purge of requested entities."""
@ -538,7 +539,8 @@ class Recorder(threading.Thread):
# Purge will schedule the perodic cleanups # Purge will schedule the perodic cleanups
# after it completes to ensure it does not happen # after it completes to ensure it does not happen
# until after the database is vacuumed # until after the database is vacuumed
self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False)) purge_before = dt_util.utcnow() - timedelta(days=self.keep_days)
self.queue.put(PurgeTask(purge_before, repack=False, apply_filter=False))
else: else:
self.queue.put(PerodicCleanupTask()) self.queue.put(PerodicCleanupTask())
@ -696,16 +698,16 @@ class Recorder(threading.Thread):
self.migration_in_progress = False self.migration_in_progress = False
persistent_notification.dismiss(self.hass, "recorder_database_migration") persistent_notification.dismiss(self.hass, "recorder_database_migration")
def _run_purge(self, keep_days, repack, apply_filter): def _run_purge(self, purge_before, repack, apply_filter):
"""Purge the database.""" """Purge the database."""
if purge.purge_old_data(self, keep_days, repack, apply_filter): if purge.purge_old_data(self, purge_before, repack, apply_filter):
# We always need to do the db cleanups after a purge # We always need to do the db cleanups after a purge
# is finished to ensure the WAL checkpoint and other # is finished to ensure the WAL checkpoint and other
# tasks happen after a vacuum. # tasks happen after a vacuum.
perodic_db_cleanups(self) perodic_db_cleanups(self)
return return
# Schedule a new purge task if this one didn't finish # Schedule a new purge task if this one didn't finish
self.queue.put(PurgeTask(keep_days, repack, apply_filter)) self.queue.put(PurgeTask(purge_before, repack, apply_filter))
def _run_purge_entities(self, entity_filter): def _run_purge_entities(self, entity_filter):
"""Purge entities from the database.""" """Purge entities from the database."""
@ -724,7 +726,7 @@ class Recorder(threading.Thread):
def _process_one_event(self, event): def _process_one_event(self, event):
"""Process one event.""" """Process one event."""
if isinstance(event, PurgeTask): if isinstance(event, PurgeTask):
self._run_purge(event.keep_days, event.repack, event.apply_filter) self._run_purge(event.purge_before, event.repack, event.apply_filter)
return return
if isinstance(event, PurgeEntitiesTask): if isinstance(event, PurgeEntitiesTask):
self._run_purge_entities(event.entity_filter) self._run_purge_entities(event.entity_filter)

View File

@ -1,15 +1,13 @@
"""Purge old data helper.""" """Purge old data helper."""
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timedelta from datetime import datetime
import logging import logging
from typing import TYPE_CHECKING, Callable from typing import TYPE_CHECKING, Callable
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct from sqlalchemy.sql.expression import distinct
import homeassistant.util.dt as dt_util
from .const import MAX_ROWS_TO_PURGE from .const import MAX_ROWS_TO_PURGE
from .models import Events, RecorderRuns, States from .models import Events, RecorderRuns, States
from .repack import repack_database from .repack import repack_database
@ -23,13 +21,12 @@ _LOGGER = logging.getLogger(__name__)
@retryable_database_job("purge") @retryable_database_job("purge")
def purge_old_data( def purge_old_data(
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False instance: Recorder, purge_before: datetime, repack: bool, apply_filter: bool = False
) -> bool: ) -> bool:
"""Purge events and states older than purge_days ago. """Purge events and states older than purge_before.
Cleans up an timeframe of an hour, based on the oldest record. Cleans up an timeframe of an hour, based on the oldest record.
""" """
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug( _LOGGER.debug(
"Purging states and events before target %s", "Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"), purge_before.isoformat(sep=" ", timespec="seconds"),

View File

@ -8,6 +8,8 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import PurgeTask
from homeassistant.components.recorder.const import MAX_ROWS_TO_PURGE
from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.models import Events, RecorderRuns, States
from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.purge import purge_old_data
from homeassistant.components.recorder.util import session_scope from homeassistant.components.recorder.util import session_scope
@ -43,8 +45,10 @@ async def test_purge_old_states(
events = session.query(Events).filter(Events.event_type == "state_changed") events = session.query(Events).filter(Events.event_type == "state_changed")
assert events.count() == 6 assert events.count() == 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data() # run purge_old_data()
finished = purge_old_data(instance, 4, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert not finished assert not finished
assert states.count() == 2 assert states.count() == 2
@ -52,7 +56,7 @@ async def test_purge_old_states(
assert states_after_purge[1].old_state_id == states_after_purge[0].state_id assert states_after_purge[1].old_state_id == states_after_purge[0].state_id
assert states_after_purge[0].old_state_id is None assert states_after_purge[0].old_state_id is None
finished = purge_old_data(instance, 4, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert finished assert finished
assert states.count() == 2 assert states.count() == 2
@ -162,13 +166,15 @@ async def test_purge_old_events(
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6 assert events.count() == 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data() # run purge_old_data()
finished = purge_old_data(instance, 4, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert not finished assert not finished
assert events.count() == 2 assert events.count() == 2
# we should only have 2 events left # we should only have 2 events left
finished = purge_old_data(instance, 4, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert finished assert finished
assert events.count() == 2 assert events.count() == 2
@ -186,11 +192,13 @@ async def test_purge_old_recorder_runs(
recorder_runs = session.query(RecorderRuns) recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7 assert recorder_runs.count() == 7
purge_before = dt_util.utcnow()
# run purge_old_data() # run purge_old_data()
finished = purge_old_data(instance, 0, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert not finished assert not finished
finished = purge_old_data(instance, 0, repack=False) finished = purge_old_data(instance, purge_before, repack=False)
assert finished assert finished
assert recorder_runs.count() == 1 assert recorder_runs.count() == 1
@ -322,6 +330,94 @@ async def test_purge_edge_case(
assert events.count() == 0 assert events.count() == 0
async def test_purge_cutoff_date(
hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT,
):
"""Test states and events are purged only if they occurred before "now() - keep_days"."""
async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> None:
timestamp_keep = cutoff
timestamp_purge = cutoff - timedelta(microseconds=1)
with recorder.session_scope(hass=hass) as session:
session.add(
Events(
event_id=1000,
event_type="KEEP",
event_data="{}",
origin="LOCAL",
created=timestamp_keep,
time_fired=timestamp_keep,
)
)
session.add(
States(
entity_id="test.cutoff",
domain="sensor",
state="keep",
attributes="{}",
last_changed=timestamp_keep,
last_updated=timestamp_keep,
created=timestamp_keep,
event_id=1000,
)
)
for row in range(1, rows):
session.add(
Events(
event_id=1000 + row,
event_type="PURGE",
event_data="{}",
origin="LOCAL",
created=timestamp_purge,
time_fired=timestamp_purge,
)
)
session.add(
States(
entity_id="test.cutoff",
domain="sensor",
state="purge",
attributes="{}",
last_changed=timestamp_purge,
last_updated=timestamp_purge,
created=timestamp_purge,
event_id=1000 + row,
)
)
instance = await async_setup_recorder_instance(hass, None)
await async_wait_purge_done(hass, instance)
service_data = {"keep_days": 2}
# Force multiple purge batches to be run
rows = MAX_ROWS_TO_PURGE + 1
cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"])
await _add_db_entries(hass, cutoff, rows)
with session_scope(hass=hass) as session:
states = session.query(States)
events = session.query(Events)
assert states.filter(States.state == "purge").count() == rows - 1
assert states.filter(States.state == "keep").count() == 1
assert events.filter(Events.event_type == "PURGE").count() == rows - 1
assert events.filter(Events.event_type == "KEEP").count() == 1
instance.queue.put(PurgeTask(cutoff, repack=False, apply_filter=False))
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await async_wait_purge_done(hass, instance)
states = session.query(States)
events = session.query(Events)
assert states.filter(States.state == "purge").count() == 0
assert states.filter(States.state == "keep").count() == 1
assert events.filter(Events.event_type == "PURGE").count() == 0
assert events.filter(Events.event_type == "KEEP").count() == 1
async def test_purge_filtered_states( async def test_purge_filtered_states(
hass: HomeAssistant, hass: HomeAssistant,
async_setup_recorder_instance: SetupRecorderInstanceT, async_setup_recorder_instance: SetupRecorderInstanceT,