From aa9b99713c70935c8c0b2cda436b8eb5d214d984 Mon Sep 17 00:00:00 2001 From: PeteBa Date: Sat, 22 May 2021 16:30:05 +0100 Subject: [PATCH] Add purge_entities service call to recorder (#48069) --- homeassistant/components/recorder/__init__.py | 58 ++++++++- homeassistant/components/recorder/purge.py | 21 +++- .../components/recorder/services.yaml | 27 +++- tests/components/recorder/test_init.py | 2 + tests/components/recorder/test_purge.py | 116 ++++++++++++++++++ 5 files changed, 219 insertions(+), 5 deletions(-) diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index 91f29225cd2..0d6dddfa2d5 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -34,6 +34,7 @@ from homeassistant.helpers.entityfilter import ( INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA, INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER, convert_include_exclude_filter, + generate_filter, ) from homeassistant.helpers.event import ( async_track_time_change, @@ -42,6 +43,7 @@ from homeassistant.helpers.event import ( from homeassistant.helpers.integration_platform import ( async_process_integration_platforms, ) +from homeassistant.helpers.service import async_extract_entity_ids from homeassistant.helpers.typing import ConfigType from homeassistant.loader import bind_hass import homeassistant.util.dt as dt_util @@ -63,6 +65,7 @@ from .util import ( _LOGGER = logging.getLogger(__name__) SERVICE_PURGE = "purge" +SERVICE_PURGE_ENTITIES = "purge_entities" SERVICE_ENABLE = "enable" SERVICE_DISABLE = "disable" @@ -79,6 +82,18 @@ SERVICE_PURGE_SCHEMA = vol.Schema( vol.Optional(ATTR_APPLY_FILTER, default=False): cv.boolean, } ) + +ATTR_DOMAINS = "domains" +ATTR_ENTITY_GLOBS = "entity_globs" + +SERVICE_PURGE_ENTITIES_SCHEMA = vol.Schema( + { + vol.Optional(ATTR_DOMAINS, default=[]): vol.All(cv.ensure_list, [cv.string]), + vol.Optional(ATTR_ENTITY_GLOBS, default=[]): vol.All( + cv.ensure_list, [cv.string] + ), + } +).extend(cv.ENTITY_SERVICE_FIELDS) SERVICE_ENABLE_SCHEMA = vol.Schema({}) SERVICE_DISABLE_SCHEMA = vol.Schema({}) @@ -252,11 +267,29 @@ def _async_register_services(hass, instance): DOMAIN, SERVICE_PURGE, async_handle_purge_service, schema=SERVICE_PURGE_SCHEMA ) - async def async_handle_enable_sevice(service): + async def async_handle_purge_entities_service(service): + """Handle calls to the purge entities service.""" + entity_ids = await async_extract_entity_ids(hass, service) + domains = service.data.get(ATTR_DOMAINS, []) + entity_globs = service.data.get(ATTR_ENTITY_GLOBS, []) + + instance.do_adhoc_purge_entities(entity_ids, domains, entity_globs) + + hass.services.async_register( + DOMAIN, + SERVICE_PURGE_ENTITIES, + async_handle_purge_entities_service, + schema=SERVICE_PURGE_ENTITIES_SCHEMA, + ) + + async def async_handle_enable_service(service): instance.set_enable(True) hass.services.async_register( - DOMAIN, SERVICE_ENABLE, async_handle_enable_sevice, schema=SERVICE_ENABLE_SCHEMA + DOMAIN, + SERVICE_ENABLE, + async_handle_enable_service, + schema=SERVICE_ENABLE_SCHEMA, ) async def async_handle_disable_service(service): @@ -278,6 +311,12 @@ class PurgeTask(NamedTuple): apply_filter: bool +class PurgeEntitiesTask(NamedTuple): + """Object to store entity information about purge task.""" + + entity_filter: Callable[[str], bool] + + class PerodicCleanupTask: """An object to insert into the recorder to trigger cleanup tasks when auto purge is disabled.""" @@ -414,6 +453,11 @@ class Recorder(threading.Thread): self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + def do_adhoc_purge_entities(self, entity_ids, domains, entity_globs): + """Trigger an adhoc purge of requested entities.""" + entity_filter = generate_filter(domains, entity_ids, [], [], entity_globs) + self.queue.put(PurgeEntitiesTask(entity_filter)) + def do_adhoc_statistics(self, **kwargs): """Trigger an adhoc statistics run.""" start = kwargs.get("start") @@ -663,6 +707,13 @@ class Recorder(threading.Thread): # Schedule a new purge task if this one didn't finish self.queue.put(PurgeTask(keep_days, repack, apply_filter)) + def _run_purge_entities(self, entity_filter): + """Purge entities from the database.""" + if purge.purge_entity_data(self, entity_filter): + return + # Schedule a new purge task if this one didn't finish + self.queue.put(PurgeEntitiesTask(entity_filter)) + def _run_statistics(self, start): """Run statistics task.""" if statistics.compile_statistics(self, start): @@ -675,6 +726,9 @@ class Recorder(threading.Thread): if isinstance(event, PurgeTask): self._run_purge(event.keep_days, event.repack, event.apply_filter) return + if isinstance(event, PurgeEntitiesTask): + self._run_purge_entities(event.entity_filter) + return if isinstance(event, PerodicCleanupTask): perodic_db_cleanups(self) return diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 62914c01de7..e1cf15e331d 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timedelta import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -195,3 +195,22 @@ def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> state_ids: list[int] = [state.state_id for state in states] _purge_state_ids(session, state_ids) _purge_event_ids(session, event_ids) + + +@retryable_database_job("purge") +def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool: + """Purge states and events of specified entities.""" + with session_scope(session=instance.get_session()) as session: # type: ignore + selected_entity_ids: list[str] = [ + entity_id + for (entity_id,) in session.query(distinct(States.entity_id)).all() + if entity_filter(entity_id) + ] + _LOGGER.debug("Purging entity data for %s", selected_entity_ids) + if len(selected_entity_ids) > 0: + # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record + _purge_filtered_states(session, selected_entity_ids) + _LOGGER.debug("Purging entity data hasn't fully completed yet") + return False + + return True diff --git a/homeassistant/components/recorder/services.yaml b/homeassistant/components/recorder/services.yaml index dcd8477d4bd..67879867cc7 100644 --- a/homeassistant/components/recorder/services.yaml +++ b/homeassistant/components/recorder/services.yaml @@ -18,8 +18,7 @@ purge: repack: name: Repack - description: - Attempt to save disk space by rewriting the entire database file. + description: Attempt to save disk space by rewriting the entire database file. example: true default: false selector: @@ -33,6 +32,30 @@ purge: selector: boolean: +purge_entities: + name: Purge Entities + description: Start purge task to remove specific entities from your database. + target: + entity: {} + fields: + domains: + name: Domains to remove + description: List the domains that need to be removed from the recorder database. + example: "sun" + required: false + default: [] + selector: + object: + + entity_globs: + name: Entity Globs to remove + description: List the regular expressions to select entities for removal from the recorder database. + example: "domain*.object_id*" + required: false + default: [] + selector: + object: + disable: name: Disable description: Stop the recording of events and state changes diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 5d4620ef29c..195e56dc748 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -17,6 +17,7 @@ from homeassistant.components.recorder import ( SERVICE_DISABLE, SERVICE_ENABLE, SERVICE_PURGE, + SERVICE_PURGE_ENTITIES, SQLITE_URL_PREFIX, Recorder, run_information, @@ -822,6 +823,7 @@ def test_has_services(hass_recorder): assert hass.services.has_service(DOMAIN, SERVICE_DISABLE) assert hass.services.has_service(DOMAIN, SERVICE_ENABLE) assert hass.services.has_service(DOMAIN, SERVICE_PURGE) + assert hass.services.has_service(DOMAIN, SERVICE_PURGE_ENTITIES) def test_service_disable_events_not_recording(hass, hass_recorder): diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index cdbe6e3c338..6727b4da495 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -653,6 +653,122 @@ async def test_purge_filtered_events_state_changed( assert session.query(States).get(63).old_state_id == 62 # should have been kept +async def test_purge_entities( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test purging of specific entities.""" + instance = await async_setup_recorder_instance(hass) + + async def _purge_entities(hass, entity_ids, domains, entity_globs): + service_data = { + "entity_id": entity_ids, + "domains": domains, + "entity_globs": entity_globs, + } + + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE_ENTITIES, service_data + ) + await hass.async_block_till_done() + + await async_recorder_block_till_done(hass, instance) + await async_wait_purge_done(hass, instance) + + def _add_purge_records(hass: HomeAssistant) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be purged + for days in range(1, 4): + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(1000, 1020): + _add_state_and_state_changed_event( + session, + "sensor.purge_entity", + "purgeme", + timestamp, + event_id * days, + ) + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(10000, 10020): + _add_state_and_state_changed_event( + session, + "purge_domain.entity", + "purgeme", + timestamp, + event_id * days, + ) + timestamp = dt_util.utcnow() - timedelta(days=days) + for event_id in range(100000, 100020): + _add_state_and_state_changed_event( + session, + "binary_sensor.purge_glob", + "purgeme", + timestamp, + event_id * days, + ) + + def _add_keep_records(hass: HomeAssistant) -> None: + with recorder.session_scope(hass=hass) as session: + # Add states and state_changed events that should be kept + timestamp = dt_util.utcnow() - timedelta(days=2) + for event_id in range(200, 210): + _add_state_and_state_changed_event( + session, + "sensor.keep", + "keep", + timestamp, + event_id, + ) + + _add_purge_records(hass) + _add_keep_records(hass) + + # Confirm standard service call + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 190 + + await _purge_entities( + hass, "sensor.purge_entity", "purge_domain", "*purge_glob" + ) + assert states.count() == 10 + + states_sensor_kept = session.query(States).filter( + States.entity_id == "sensor.keep" + ) + assert states_sensor_kept.count() == 10 + + _add_purge_records(hass) + + # Confirm each parameter purges only the associated records + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 190 + + await _purge_entities(hass, "sensor.purge_entity", [], []) + assert states.count() == 130 + + await _purge_entities(hass, [], "purge_domain", []) + assert states.count() == 70 + + await _purge_entities(hass, [], [], "*purge_glob") + assert states.count() == 10 + + states_sensor_kept = session.query(States).filter( + States.entity_id == "sensor.keep" + ) + assert states_sensor_kept.count() == 10 + + _add_purge_records(hass) + + # Confirm calling service without arguments matches all records (default filter behaviour) + with session_scope(hass=hass) as session: + states = session.query(States) + assert states.count() == 190 + + await _purge_entities(hass, [], [], []) + assert states.count() == 0 + + async def _add_test_states(hass: HomeAssistant, instance: recorder.Recorder): """Add multiple states to the db for testing.""" utcnow = dt_util.utcnow()