diff --git a/.coveragerc b/.coveragerc index 2347ee1902e..db940ed642b 100644 --- a/.coveragerc +++ b/.coveragerc @@ -785,6 +785,7 @@ omit = homeassistant/components/raspyrfm/* homeassistant/components/recollect_waste/__init__.py homeassistant/components/recollect_waste/sensor.py + homeassistant/components/recorder/repack.py homeassistant/components/recswitch/switch.py homeassistant/components/reddit/* homeassistant/components/rejseplanen/sensor.py diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index a2b5ffc6f2a..026628a32df 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -5,3 +5,6 @@ SQLITE_URL_PREFIX = "sqlite://" DOMAIN = "recorder" CONF_DB_INTEGRITY_CHECK = "db_integrity_check" + +# The maximum number of rows (events) we purge in one delete statement +MAX_ROWS_TO_PURGE = 1000 diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index 9481e954bde..69e2115ce34 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -117,7 +117,7 @@ class States(Base): # type: ignore last_updated = Column(DateTime(timezone=True), default=dt_util.utcnow, index=True) created = Column(DateTime(timezone=True), default=dt_util.utcnow) old_state_id = Column( - Integer, ForeignKey("states.state_id", ondelete="SET NULL"), index=True + Integer, ForeignKey("states.state_id", ondelete="NO ACTION"), index=True ) event = relationship("Events", uselist=False) old_state = relationship("States", remote_side=[state_id]) diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 43e84785f7d..ac10dadc227 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -1,88 +1,51 @@ """Purge old data helper.""" -from datetime import timedelta +from __future__ import annotations + +from datetime import datetime, timedelta import logging import time +from typing import TYPE_CHECKING from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.orm.session import Session import homeassistant.util.dt as dt_util +from .const import MAX_ROWS_TO_PURGE from .models import Events, RecorderRuns, States -from .util import execute, session_scope +from .repack import repack_database +from .util import session_scope + +if TYPE_CHECKING: + from . import Recorder _LOGGER = logging.getLogger(__name__) -def purge_old_data(instance, purge_days: int, repack: bool) -> bool: +def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool: """Purge events and states older than purge_days ago. Cleans up an timeframe of an hour, based on the oldest record. """ purge_before = dt_util.utcnow() - timedelta(days=purge_days) _LOGGER.debug("Purging states and events before target %s", purge_before) - try: - with session_scope(session=instance.get_session()) as session: - # Purge a max of 1 hour, based on the oldest states or events record - batch_purge_before = purge_before - - query = session.query(States).order_by(States.last_updated.asc()).limit(1) - states = execute(query, to_native=True, validate_entity_ids=False) - if states: - batch_purge_before = min( - batch_purge_before, - states[0].last_updated + timedelta(hours=1), - ) - - query = session.query(Events).order_by(Events.time_fired.asc()).limit(1) - events = execute(query, to_native=True) - if events: - batch_purge_before = min( - batch_purge_before, - events[0].time_fired + timedelta(hours=1), - ) - - _LOGGER.debug("Purging states and events before %s", batch_purge_before) - - deleted_rows = ( - session.query(States) - .filter(States.last_updated < batch_purge_before) - .delete(synchronize_session=False) - ) - _LOGGER.debug("Deleted %s states", deleted_rows) - - deleted_rows = ( - session.query(Events) - .filter(Events.time_fired < batch_purge_before) - .delete(synchronize_session=False) - ) - _LOGGER.debug("Deleted %s events", deleted_rows) - - # If states or events purging isn't processing the purge_before yet, - # return false, as we are not done yet. - if batch_purge_before != purge_before: + with session_scope(session=instance.get_session()) as session: # type: ignore + # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record + event_ids = _select_event_ids_to_purge(session, purge_before) + state_ids = _select_state_ids_to_purge(session, purge_before, event_ids) + if state_ids: + _disconnect_states_about_to_be_purged(session, state_ids) + _purge_state_ids(session, state_ids) + if event_ids: + _purge_event_ids(session, event_ids) + # If states or events purging isn't processing the purge_before yet, + # return false, as we are not done yet. _LOGGER.debug("Purging hasn't fully completed yet") return False - - # Recorder runs is small, no need to batch run it - deleted_rows = ( - session.query(RecorderRuns) - .filter(RecorderRuns.start < purge_before) - .filter(RecorderRuns.run_id != instance.run_info.run_id) - .delete(synchronize_session=False) - ) - _LOGGER.debug("Deleted %s recorder_runs", deleted_rows) - + _purge_old_recorder_runs(instance, session, purge_before) if repack: - # Execute sqlite or postgresql vacuum command to free up space on disk - if instance.engine.driver in ("pysqlite", "postgresql"): - _LOGGER.debug("Vacuuming SQL DB to free space") - instance.engine.execute("VACUUM") - # Optimize mysql / mariadb tables to free up space on disk - elif instance.engine.driver in ("mysqldb", "pymysql"): - _LOGGER.debug("Optimizing SQL DB to free space") - instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs") - + repack_database(instance) except OperationalError as err: # Retry when one of the following MySQL errors occurred: # 1205: Lock wait timeout exceeded; try restarting transaction @@ -101,3 +64,78 @@ def purge_old_data(instance, purge_days: int, repack: bool) -> bool: except SQLAlchemyError as err: _LOGGER.warning("Error purging history: %s", err) return True + + +def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list: + """Return a list of event ids to purge.""" + events = ( + session.query(Events.event_id) + .filter(Events.time_fired < purge_before) + .limit(MAX_ROWS_TO_PURGE) + .all() + ) + _LOGGER.debug("Selected %s event ids to remove", len(events)) + return [event.event_id for event in events] + + +def _select_state_ids_to_purge( + session: Session, purge_before: datetime, event_ids: list +) -> list: + """Return a list of state ids to purge.""" + if not event_ids: + return [] + states = ( + session.query(States.state_id) + .filter(States.last_updated < purge_before) + .filter(States.event_id.in_(event_ids)) + .all() + ) + _LOGGER.debug("Selected %s state ids to remove", len(states)) + return [state.state_id for state in states] + + +def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> None: + # Update old_state_id to NULL before deleting to ensure + # the delete does not fail due to a foreign key constraint + # since some databases (MSSQL) cannot do the ON DELETE SET NULL + # for us. + disconnected_rows = ( + session.query(States) + .filter(States.old_state_id.in_(state_ids)) + .update({"old_state_id": None}, synchronize_session=False) + ) + _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows) + + +def _purge_state_ids(session: Session, state_ids: list) -> None: + """Delete by state id.""" + deleted_rows = ( + session.query(States) + .filter(States.state_id.in_(state_ids)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s states", deleted_rows) + + +def _purge_event_ids(session: Session, event_ids: list) -> None: + """Delete by event id.""" + deleted_rows = ( + session.query(Events) + .filter(Events.event_id.in_(event_ids)) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s events", deleted_rows) + + +def _purge_old_recorder_runs( + instance: Recorder, session: Session, purge_before: datetime +) -> None: + """Purge all old recorder runs.""" + # Recorder runs is small, no need to batch run it + deleted_rows = ( + session.query(RecorderRuns) + .filter(RecorderRuns.start < purge_before) + .filter(RecorderRuns.run_id != instance.run_info.run_id) + .delete(synchronize_session=False) + ) + _LOGGER.debug("Deleted %s recorder_runs", deleted_rows) diff --git a/homeassistant/components/recorder/repack.py b/homeassistant/components/recorder/repack.py new file mode 100644 index 00000000000..68d7d5954c9 --- /dev/null +++ b/homeassistant/components/recorder/repack.py @@ -0,0 +1,35 @@ +"""Purge repack helper.""" +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from . import Recorder + +_LOGGER = logging.getLogger(__name__) + + +def repack_database(instance: Recorder) -> None: + """Repack based on engine type.""" + + # Execute sqlite command to free up space on disk + if instance.engine.dialect.name == "sqlite": + _LOGGER.debug("Vacuuming SQL DB to free space") + instance.engine.execute("VACUUM") + return + + # Execute postgresql vacuum command to free up space on disk + if instance.engine.dialect.name == "postgresql": + _LOGGER.debug("Vacuuming SQL DB to free space") + with instance.engine.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + conn.execute("VACUUM") + return + + # Optimize mysql / mariadb tables to free up space on disk + if instance.engine.dialect.name == "mysql": + _LOGGER.debug("Optimizing SQL DB to free space") + instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs") + return diff --git a/setup.cfg b/setup.cfg index 98a01278838..dbdb61b5fcf 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,7 +43,7 @@ warn_redundant_casts = true warn_unused_configs = true -[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*] +[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.recorder.purge,homeassistant.components.recorder.repack,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*] strict = true ignore_errors = false warn_unreachable = true diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 791bd84b11b..aaf53000865 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,7 +1,6 @@ """Test data purging.""" -from datetime import datetime, timedelta +from datetime import timedelta import json -from unittest.mock import patch from homeassistant.components import recorder from homeassistant.components.recorder.const import DATA_INSTANCE @@ -22,16 +21,21 @@ def test_purge_old_states(hass, hass_recorder): with session_scope(hass=hass) as session: states = session.query(States) assert states.count() == 6 + assert states[0].old_state_id is None + assert states[-1].old_state_id == states[-2].state_id + + events = session.query(Events).filter(Events.event_type == "state_changed") + assert events.count() == 6 # run purge_old_data() - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert states.count() == 4 - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) assert not finished assert states.count() == 2 + states_after_purge = session.query(States) + assert states_after_purge[1].old_state_id == states_after_purge[0].state_id + assert states_after_purge[0].old_state_id is None + finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) assert finished assert states.count() == 2 @@ -47,10 +51,6 @@ def test_purge_old_events(hass, hass_recorder): assert events.count() == 6 # run purge_old_data() - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) - assert not finished - assert events.count() == 4 - finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False) assert not finished assert events.count() == 2 @@ -72,12 +72,15 @@ def test_purge_old_recorder_runs(hass, hass_recorder): assert recorder_runs.count() == 7 # run purge_old_data() + finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False) + assert not finished + finished = purge_old_data(hass.data[DATA_INSTANCE], 0, repack=False) assert finished assert recorder_runs.count() == 1 -def test_purge_method(hass, hass_recorder): +def test_purge_method(hass, hass_recorder, caplog): """Test purge method.""" hass = hass_recorder() service_data = {"keep_days": 4} @@ -131,23 +134,19 @@ def test_purge_method(hass, hass_recorder): assert not ("EVENT_TEST_PURGE" in (event.event_type for event in events.all())) # run purge method - correct service data, with repack - with patch("homeassistant.components.recorder.purge._LOGGER") as mock_logger: - service_data["repack"] = True - hass.services.call("recorder", "purge", service_data=service_data) - hass.block_till_done() - hass.data[DATA_INSTANCE].block_till_done() - wait_recording_done(hass) - assert ( - mock_logger.debug.mock_calls[5][1][0] - == "Vacuuming SQL DB to free space" - ) + service_data["repack"] = True + hass.services.call("recorder", "purge", service_data=service_data) + hass.block_till_done() + hass.data[DATA_INSTANCE].block_till_done() + wait_recording_done(hass) + assert "Vacuuming SQL DB to free space" in caplog.text def _add_test_states(hass): """Add multiple states to the db for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) attributes = {"test_attr": 5, "test_attr_10": "nice"} hass.block_till_done() @@ -155,6 +154,7 @@ def _add_test_states(hass): wait_recording_done(hass) with recorder.session_scope(hass=hass) as session: + old_state_id = None for event_id in range(6): if event_id < 2: timestamp = eleven_days_ago @@ -163,28 +163,39 @@ def _add_test_states(hass): timestamp = five_days_ago state = "purgeme" else: - timestamp = now + timestamp = utcnow state = "dontpurgeme" - session.add( - States( - entity_id="test.recorder2", - domain="sensor", - state=state, - attributes=json.dumps(attributes), - last_changed=timestamp, - last_updated=timestamp, - created=timestamp, - event_id=event_id + 1000, - ) + event = Events( + event_type="state_changed", + event_data="{}", + origin="LOCAL", + created=timestamp, + time_fired=timestamp, ) + session.add(event) + session.flush() + state = States( + entity_id="test.recorder2", + domain="sensor", + state=state, + attributes=json.dumps(attributes), + last_changed=timestamp, + last_updated=timestamp, + created=timestamp, + event_id=event.event_id, + old_state_id=old_state_id, + ) + session.add(state) + session.flush() + old_state_id = state.state_id def _add_test_events(hass): """Add a few events for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) event_data = {"test_attr": 5, "test_attr_10": "nice"} hass.block_till_done() @@ -200,7 +211,7 @@ def _add_test_events(hass): timestamp = five_days_ago event_type = "EVENT_TEST_PURGE" else: - timestamp = now + timestamp = utcnow event_type = "EVENT_TEST" session.add( @@ -216,9 +227,9 @@ def _add_test_events(hass): def _add_test_recorder_runs(hass): """Add a few recorder_runs for testing.""" - now = datetime.now() - five_days_ago = now - timedelta(days=5) - eleven_days_ago = now - timedelta(days=11) + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) hass.block_till_done() hass.data[DATA_INSTANCE].block_till_done() @@ -231,7 +242,7 @@ def _add_test_recorder_runs(hass): elif rec_id < 4: timestamp = five_days_ago else: - timestamp = now + timestamp = utcnow session.add( RecorderRuns(