mirror of
https://github.com/home-assistant/core.git
synced 2025-07-19 11:17:21 +00:00
Fix filtered purge not removing newer events (#89721)
This commit is contained in:
parent
858fc30fcd
commit
b906d67c1e
@ -142,12 +142,10 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||||||
hass_config_path=hass.config.path(DEFAULT_DB_FILE)
|
hass_config_path=hass.config.path(DEFAULT_DB_FILE)
|
||||||
)
|
)
|
||||||
exclude = conf[CONF_EXCLUDE]
|
exclude = conf[CONF_EXCLUDE]
|
||||||
exclude_t = exclude.get(CONF_EVENT_TYPES, [])
|
exclude_event_types: set[str] = set(exclude.get(CONF_EVENT_TYPES, []))
|
||||||
if EVENT_STATE_CHANGED in exclude_t:
|
if EVENT_STATE_CHANGED in exclude_event_types:
|
||||||
_LOGGER.warning(
|
_LOGGER.error("State change events cannot be excluded, use a filter instead")
|
||||||
"State change events are excluded, recorder will not record state changes."
|
exclude_event_types.remove(EVENT_STATE_CHANGED)
|
||||||
"This will become an error in Home Assistant Core 2022.2"
|
|
||||||
)
|
|
||||||
instance = hass.data[DATA_INSTANCE] = Recorder(
|
instance = hass.data[DATA_INSTANCE] = Recorder(
|
||||||
hass=hass,
|
hass=hass,
|
||||||
auto_purge=auto_purge,
|
auto_purge=auto_purge,
|
||||||
@ -158,7 +156,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||||||
db_max_retries=db_max_retries,
|
db_max_retries=db_max_retries,
|
||||||
db_retry_wait=db_retry_wait,
|
db_retry_wait=db_retry_wait,
|
||||||
entity_filter=entity_filter,
|
entity_filter=entity_filter,
|
||||||
exclude_t=exclude_t,
|
exclude_event_types=exclude_event_types,
|
||||||
exclude_attributes_by_domain=exclude_attributes_by_domain,
|
exclude_attributes_by_domain=exclude_attributes_by_domain,
|
||||||
)
|
)
|
||||||
instance.async_initialize()
|
instance.async_initialize()
|
||||||
|
@ -181,7 +181,7 @@ class Recorder(threading.Thread):
|
|||||||
db_max_retries: int,
|
db_max_retries: int,
|
||||||
db_retry_wait: int,
|
db_retry_wait: int,
|
||||||
entity_filter: Callable[[str], bool],
|
entity_filter: Callable[[str], bool],
|
||||||
exclude_t: list[str],
|
exclude_event_types: set[str],
|
||||||
exclude_attributes_by_domain: dict[str, set[str]],
|
exclude_attributes_by_domain: dict[str, set[str]],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Initialize the recorder."""
|
"""Initialize the recorder."""
|
||||||
@ -214,7 +214,7 @@ class Recorder(threading.Thread):
|
|||||||
# it can be used to see if an entity is being recorded and is called
|
# it can be used to see if an entity is being recorded and is called
|
||||||
# by is_entity_recorder and the sensor recorder.
|
# by is_entity_recorder and the sensor recorder.
|
||||||
self.entity_filter = entity_filter
|
self.entity_filter = entity_filter
|
||||||
self.exclude_t = set(exclude_t)
|
self.exclude_event_types = exclude_event_types
|
||||||
|
|
||||||
self.schema_version = 0
|
self.schema_version = 0
|
||||||
self._commits_without_expire = 0
|
self._commits_without_expire = 0
|
||||||
@ -388,7 +388,7 @@ class Recorder(threading.Thread):
|
|||||||
@callback
|
@callback
|
||||||
def _async_event_filter(self, event: Event) -> bool:
|
def _async_event_filter(self, event: Event) -> bool:
|
||||||
"""Filter events."""
|
"""Filter events."""
|
||||||
if event.event_type in self.exclude_t:
|
if event.event_type in self.exclude_event_types:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None:
|
if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None:
|
||||||
|
@ -435,7 +435,8 @@ class States(Base):
|
|||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
"""Return string representation of instance for debugging."""
|
"""Return string representation of instance for debugging."""
|
||||||
return (
|
return (
|
||||||
f"<recorder.States(id={self.state_id}, entity_id='{self.entity_id}',"
|
f"<recorder.States(id={self.state_id}, entity_id='{self.entity_id}'"
|
||||||
|
f" metadata_id={self.metadata_id},"
|
||||||
f" state='{self.state}', event_id='{self.event_id}',"
|
f" state='{self.state}', event_id='{self.event_id}',"
|
||||||
f" last_updated='{self._last_updated_isotime}',"
|
f" last_updated='{self._last_updated_isotime}',"
|
||||||
f" old_state_id={self.old_state_id}, attributes_id={self.attributes_id})>"
|
f" old_state_id={self.old_state_id}, attributes_id={self.attributes_id})>"
|
||||||
|
@ -1,21 +1,20 @@
|
|||||||
"""Purge old data helper."""
|
"""Purge old data helper."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Callable, Iterable
|
from collections.abc import Callable
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from itertools import zip_longest
|
from itertools import zip_longest
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from typing import TYPE_CHECKING
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
from sqlalchemy.engine.row import Row
|
from sqlalchemy.engine.row import Row
|
||||||
from sqlalchemy.orm.session import Session
|
from sqlalchemy.orm.session import Session
|
||||||
from sqlalchemy.sql.expression import distinct
|
|
||||||
|
|
||||||
from homeassistant.const import EVENT_STATE_CHANGED
|
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
|
|
||||||
from .const import SQLITE_MAX_BIND_VARS
|
from .const import SQLITE_MAX_BIND_VARS
|
||||||
from .db_schema import Events, StateAttributes, States, StatesMeta
|
from .db_schema import Events, States, StatesMeta
|
||||||
from .models import DatabaseEngine
|
from .models import DatabaseEngine
|
||||||
from .queries import (
|
from .queries import (
|
||||||
attributes_ids_exist_in_states,
|
attributes_ids_exist_in_states,
|
||||||
@ -144,11 +143,9 @@ def _purge_legacy_format(
|
|||||||
) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
||||||
session, purge_before
|
session, purge_before
|
||||||
)
|
)
|
||||||
if state_ids:
|
_purge_state_ids(instance, session, state_ids)
|
||||||
_purge_state_ids(instance, session, state_ids)
|
|
||||||
_purge_unused_attributes_ids(instance, session, attributes_ids)
|
_purge_unused_attributes_ids(instance, session, attributes_ids)
|
||||||
if event_ids:
|
_purge_event_ids(session, event_ids)
|
||||||
_purge_event_ids(session, event_ids)
|
|
||||||
_purge_unused_data_ids(instance, session, data_ids)
|
_purge_unused_data_ids(instance, session, data_ids)
|
||||||
return bool(event_ids or state_ids or attributes_ids or data_ids)
|
return bool(event_ids or state_ids or attributes_ids or data_ids)
|
||||||
|
|
||||||
@ -448,6 +445,8 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
|
|||||||
|
|
||||||
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
|
||||||
"""Disconnect states and delete by state id."""
|
"""Disconnect states and delete by state id."""
|
||||||
|
if not state_ids:
|
||||||
|
return
|
||||||
|
|
||||||
# Update old_state_id to NULL before deleting to ensure
|
# Update old_state_id to NULL before deleting to ensure
|
||||||
# the delete does not fail due to a foreign key constraint
|
# the delete does not fail due to a foreign key constraint
|
||||||
@ -559,8 +558,10 @@ def _purge_short_term_statistics(
|
|||||||
_LOGGER.debug("Deleted %s short term statistics", deleted_rows)
|
_LOGGER.debug("Deleted %s short term statistics", deleted_rows)
|
||||||
|
|
||||||
|
|
||||||
def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None:
|
def _purge_event_ids(session: Session, event_ids: set[int]) -> None:
|
||||||
"""Delete by event id."""
|
"""Delete by event id."""
|
||||||
|
if not event_ids:
|
||||||
|
return
|
||||||
deleted_rows = session.execute(delete_event_rows(event_ids))
|
deleted_rows = session.execute(delete_event_rows(event_ids))
|
||||||
_LOGGER.debug("Deleted %s events", deleted_rows)
|
_LOGGER.debug("Deleted %s events", deleted_rows)
|
||||||
|
|
||||||
@ -619,9 +620,11 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
|
|||||||
_LOGGER.debug("Cleanup filtered data")
|
_LOGGER.debug("Cleanup filtered data")
|
||||||
database_engine = instance.database_engine
|
database_engine = instance.database_engine
|
||||||
assert database_engine is not None
|
assert database_engine is not None
|
||||||
|
now_timestamp = time.time()
|
||||||
|
|
||||||
# Check if excluded entity_ids are in database
|
# Check if excluded entity_ids are in database
|
||||||
entity_filter = instance.entity_filter
|
entity_filter = instance.entity_filter
|
||||||
|
has_more_states_to_purge = False
|
||||||
excluded_metadata_ids: list[str] = [
|
excluded_metadata_ids: list[str] = [
|
||||||
metadata_id
|
metadata_id
|
||||||
for (metadata_id, entity_id) in session.query(
|
for (metadata_id, entity_id) in session.query(
|
||||||
@ -629,92 +632,123 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
|
|||||||
).all()
|
).all()
|
||||||
if not entity_filter(entity_id)
|
if not entity_filter(entity_id)
|
||||||
]
|
]
|
||||||
if len(excluded_metadata_ids) > 0:
|
if excluded_metadata_ids:
|
||||||
_purge_filtered_states(
|
has_more_states_to_purge = _purge_filtered_states(
|
||||||
instance, session, excluded_metadata_ids, database_engine
|
instance, session, excluded_metadata_ids, database_engine, now_timestamp
|
||||||
)
|
)
|
||||||
return False
|
|
||||||
|
|
||||||
# Check if excluded event_types are in database
|
# Check if excluded event_types are in database
|
||||||
excluded_event_types: list[str] = [
|
has_more_events_to_purge = False
|
||||||
event_type
|
if (
|
||||||
for (event_type,) in session.query(distinct(Events.event_type)).all()
|
event_type_to_event_type_ids := instance.event_type_manager.get_many(
|
||||||
if event_type in instance.exclude_t
|
instance.exclude_event_types, session
|
||||||
]
|
)
|
||||||
if len(excluded_event_types) > 0:
|
) and (
|
||||||
_purge_filtered_events(instance, session, excluded_event_types)
|
excluded_event_type_ids := [
|
||||||
return False
|
event_type_id
|
||||||
|
for event_type_id in event_type_to_event_type_ids.values()
|
||||||
|
if event_type_id is not None
|
||||||
|
]
|
||||||
|
):
|
||||||
|
has_more_events_to_purge = _purge_filtered_events(
|
||||||
|
instance, session, excluded_event_type_ids, now_timestamp
|
||||||
|
)
|
||||||
|
|
||||||
return True
|
# Purge has completed if there are not more state or events to purge
|
||||||
|
return not (has_more_states_to_purge or has_more_events_to_purge)
|
||||||
|
|
||||||
|
|
||||||
def _purge_filtered_states(
|
def _purge_filtered_states(
|
||||||
instance: Recorder,
|
instance: Recorder,
|
||||||
session: Session,
|
session: Session,
|
||||||
excluded_metadata_ids: list[str],
|
metadata_ids_to_purge: list[str],
|
||||||
database_engine: DatabaseEngine,
|
database_engine: DatabaseEngine,
|
||||||
) -> None:
|
purge_before_timestamp: float,
|
||||||
"""Remove filtered states and linked events."""
|
) -> bool:
|
||||||
|
"""Remove filtered states and linked events.
|
||||||
|
|
||||||
|
Return true if all states are purged
|
||||||
|
"""
|
||||||
state_ids: tuple[int, ...]
|
state_ids: tuple[int, ...]
|
||||||
attributes_ids: tuple[int, ...]
|
attributes_ids: tuple[int, ...]
|
||||||
event_ids: tuple[int, ...]
|
event_ids: tuple[int, ...]
|
||||||
state_ids, attributes_ids, event_ids = zip(
|
to_purge = list(
|
||||||
*(
|
session.query(States.state_id, States.attributes_id, States.event_id)
|
||||||
session.query(States.state_id, States.attributes_id, States.event_id)
|
.filter(States.metadata_id.in_(metadata_ids_to_purge))
|
||||||
.filter(States.metadata_id.in_(excluded_metadata_ids))
|
.filter(States.last_updated_ts < purge_before_timestamp)
|
||||||
.limit(SQLITE_MAX_BIND_VARS)
|
.limit(SQLITE_MAX_BIND_VARS)
|
||||||
.all()
|
.all()
|
||||||
)
|
|
||||||
)
|
)
|
||||||
filtered_event_ids = [id_ for id_ in event_ids if id_ is not None]
|
if not to_purge:
|
||||||
|
return True
|
||||||
|
state_ids, attributes_ids, event_ids = zip(*to_purge)
|
||||||
|
filtered_event_ids = {id_ for id_ in event_ids if id_ is not None}
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Selected %s state_ids to remove that should be filtered", len(state_ids)
|
"Selected %s state_ids to remove that should be filtered", len(state_ids)
|
||||||
)
|
)
|
||||||
_purge_state_ids(instance, session, set(state_ids))
|
_purge_state_ids(instance, session, set(state_ids))
|
||||||
|
# These are legacy events that are linked to a state that are no longer
|
||||||
|
# created but since we did not remove them when we stopped adding new ones
|
||||||
|
# we will need to purge them here.
|
||||||
_purge_event_ids(session, filtered_event_ids)
|
_purge_event_ids(session, filtered_event_ids)
|
||||||
unused_attribute_ids_set = _select_unused_attributes_ids(
|
unused_attribute_ids_set = _select_unused_attributes_ids(
|
||||||
session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine
|
session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine
|
||||||
)
|
)
|
||||||
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
|
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _purge_filtered_events(
|
def _purge_filtered_events(
|
||||||
instance: Recorder, session: Session, excluded_event_types: list[str]
|
instance: Recorder,
|
||||||
) -> None:
|
session: Session,
|
||||||
"""Remove filtered events and linked states."""
|
excluded_event_type_ids: list[int],
|
||||||
|
purge_before_timestamp: float,
|
||||||
|
) -> bool:
|
||||||
|
"""Remove filtered events and linked states.
|
||||||
|
|
||||||
|
Return true if all events are purged.
|
||||||
|
"""
|
||||||
database_engine = instance.database_engine
|
database_engine = instance.database_engine
|
||||||
assert database_engine is not None
|
assert database_engine is not None
|
||||||
event_ids, data_ids = zip(
|
to_purge = list(
|
||||||
*(
|
session.query(Events.event_id, Events.data_id)
|
||||||
session.query(Events.event_id, Events.data_id)
|
.filter(Events.event_type_id.in_(excluded_event_type_ids))
|
||||||
.filter(Events.event_type.in_(excluded_event_types))
|
.filter(Events.time_fired_ts < purge_before_timestamp)
|
||||||
.limit(SQLITE_MAX_BIND_VARS)
|
.limit(SQLITE_MAX_BIND_VARS)
|
||||||
.all()
|
.all()
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
if not to_purge:
|
||||||
|
return True
|
||||||
|
event_ids, data_ids = zip(*to_purge)
|
||||||
|
event_ids_set = set(event_ids)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
"Selected %s event_ids to remove that should be filtered", len(event_ids)
|
"Selected %s event_ids to remove that should be filtered", len(event_ids_set)
|
||||||
)
|
)
|
||||||
states: list[Row[tuple[int]]] = (
|
states: list[Row[tuple[int]]] = (
|
||||||
session.query(States.state_id).filter(States.event_id.in_(event_ids)).all()
|
session.query(States.state_id).filter(States.event_id.in_(event_ids_set)).all()
|
||||||
)
|
)
|
||||||
state_ids: set[int] = {state.state_id for state in states}
|
if states:
|
||||||
_purge_state_ids(instance, session, state_ids)
|
# These are legacy states that are linked to an event that are no longer
|
||||||
_purge_event_ids(session, event_ids)
|
# created but since we did not remove them when we stopped adding new ones
|
||||||
|
# we will need to purge them here.
|
||||||
|
state_ids: set[int] = {state.state_id for state in states}
|
||||||
|
_purge_state_ids(instance, session, state_ids)
|
||||||
|
_purge_event_ids(session, event_ids_set)
|
||||||
if unused_data_ids_set := _select_unused_event_data_ids(
|
if unused_data_ids_set := _select_unused_event_data_ids(
|
||||||
session, set(data_ids), database_engine
|
session, set(data_ids), database_engine
|
||||||
):
|
):
|
||||||
_purge_batch_data_ids(instance, session, unused_data_ids_set)
|
_purge_batch_data_ids(instance, session, unused_data_ids_set)
|
||||||
if EVENT_STATE_CHANGED in excluded_event_types:
|
return False
|
||||||
session.query(StateAttributes).delete(synchronize_session=False)
|
|
||||||
instance._state_attributes_ids = {} # pylint: disable=protected-access
|
|
||||||
|
|
||||||
|
|
||||||
@retryable_database_job("purge")
|
@retryable_database_job("purge_entity_data")
|
||||||
def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool:
|
def purge_entity_data(
|
||||||
|
instance: Recorder, entity_filter: Callable[[str], bool], purge_before: datetime
|
||||||
|
) -> bool:
|
||||||
"""Purge states and events of specified entities."""
|
"""Purge states and events of specified entities."""
|
||||||
database_engine = instance.database_engine
|
database_engine = instance.database_engine
|
||||||
assert database_engine is not None
|
assert database_engine is not None
|
||||||
|
purge_before_timestamp = purge_before.timestamp()
|
||||||
with session_scope(session=instance.get_session()) as session:
|
with session_scope(session=instance.get_session()) as session:
|
||||||
selected_metadata_ids: list[str] = [
|
selected_metadata_ids: list[str] = [
|
||||||
metadata_id
|
metadata_id
|
||||||
@ -724,12 +758,18 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
|
|||||||
if entity_filter(entity_id)
|
if entity_filter(entity_id)
|
||||||
]
|
]
|
||||||
_LOGGER.debug("Purging entity data for %s", selected_metadata_ids)
|
_LOGGER.debug("Purging entity data for %s", selected_metadata_ids)
|
||||||
if len(selected_metadata_ids) > 0:
|
if not selected_metadata_ids:
|
||||||
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states
|
return True
|
||||||
# or events record.
|
|
||||||
_purge_filtered_states(
|
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states
|
||||||
instance, session, selected_metadata_ids, database_engine
|
# or events record.
|
||||||
)
|
if not _purge_filtered_states(
|
||||||
|
instance,
|
||||||
|
session,
|
||||||
|
selected_metadata_ids,
|
||||||
|
database_engine,
|
||||||
|
purge_before_timestamp,
|
||||||
|
):
|
||||||
_LOGGER.debug("Purging entity data hasn't fully completed yet")
|
_LOGGER.debug("Purging entity data hasn't fully completed yet")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -71,7 +71,8 @@ def _async_register_purge_entities_service(
|
|||||||
domains = service.data.get(ATTR_DOMAINS, [])
|
domains = service.data.get(ATTR_DOMAINS, [])
|
||||||
entity_globs = service.data.get(ATTR_ENTITY_GLOBS, [])
|
entity_globs = service.data.get(ATTR_ENTITY_GLOBS, [])
|
||||||
entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs)
|
entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs)
|
||||||
instance.queue_task(PurgeEntitiesTask(entity_filter))
|
purge_before = dt_util.utcnow()
|
||||||
|
instance.queue_task(PurgeEntitiesTask(entity_filter, purge_before))
|
||||||
|
|
||||||
hass.services.async_register(
|
hass.services.async_register(
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
|
@ -114,13 +114,14 @@ class PurgeEntitiesTask(RecorderTask):
|
|||||||
"""Object to store entity information about purge task."""
|
"""Object to store entity information about purge task."""
|
||||||
|
|
||||||
entity_filter: Callable[[str], bool]
|
entity_filter: Callable[[str], bool]
|
||||||
|
purge_before: datetime
|
||||||
|
|
||||||
def run(self, instance: Recorder) -> None:
|
def run(self, instance: Recorder) -> None:
|
||||||
"""Purge entities from the database."""
|
"""Purge entities from the database."""
|
||||||
if purge.purge_entity_data(instance, self.entity_filter):
|
if purge.purge_entity_data(instance, self.entity_filter, self.purge_before):
|
||||||
return
|
return
|
||||||
# Schedule a new purge task if this one didn't finish
|
# Schedule a new purge task if this one didn't finish
|
||||||
instance.queue_task(PurgeEntitiesTask(self.entity_filter))
|
instance.queue_task(PurgeEntitiesTask(self.entity_filter, self.purge_before))
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -100,7 +100,7 @@ def _default_recorder(hass):
|
|||||||
db_max_retries=10,
|
db_max_retries=10,
|
||||||
db_retry_wait=3,
|
db_retry_wait=3,
|
||||||
entity_filter=CONFIG_SCHEMA({DOMAIN: {}}),
|
entity_filter=CONFIG_SCHEMA({DOMAIN: {}}),
|
||||||
exclude_t=[],
|
exclude_event_types=set(),
|
||||||
exclude_attributes_by_domain={},
|
exclude_attributes_by_domain={},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ from homeassistant.components.recorder.db_schema import (
|
|||||||
StatisticsShortTerm,
|
StatisticsShortTerm,
|
||||||
)
|
)
|
||||||
from homeassistant.components.recorder.purge import purge_old_data
|
from homeassistant.components.recorder.purge import purge_old_data
|
||||||
|
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||||
from homeassistant.components.recorder.services import (
|
from homeassistant.components.recorder.services import (
|
||||||
SERVICE_PURGE,
|
SERVICE_PURGE,
|
||||||
SERVICE_PURGE_ENTITIES,
|
SERVICE_PURGE_ENTITIES,
|
||||||
@ -676,8 +677,8 @@ def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> Non
|
|||||||
"""Convert pending states to use states_metadata."""
|
"""Convert pending states to use states_metadata."""
|
||||||
entity_ids: set[str] = set()
|
entity_ids: set[str] = set()
|
||||||
states: set[States] = set()
|
states: set[States] = set()
|
||||||
|
states_meta_objects: dict[str, StatesMeta] = {}
|
||||||
for object in session:
|
for object in session:
|
||||||
states_meta_objects: dict[str, StatesMeta] = {}
|
|
||||||
if isinstance(object, States):
|
if isinstance(object, States):
|
||||||
entity_ids.add(object.entity_id)
|
entity_ids.add(object.entity_id)
|
||||||
states.add(object)
|
states.add(object)
|
||||||
@ -697,6 +698,33 @@ def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> Non
|
|||||||
state.states_meta_rel = states_meta_objects[entity_id]
|
state.states_meta_rel = states_meta_objects[entity_id]
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_pending_events_to_event_types(
|
||||||
|
instance: Recorder, session: Session
|
||||||
|
) -> None:
|
||||||
|
"""Convert pending events to use event_type_ids."""
|
||||||
|
event_types: set[str] = set()
|
||||||
|
events: set[Events] = set()
|
||||||
|
event_types_objects: dict[str, EventTypes] = {}
|
||||||
|
for object in session:
|
||||||
|
if isinstance(object, Events):
|
||||||
|
event_types.add(object.event_type)
|
||||||
|
events.add(object)
|
||||||
|
|
||||||
|
event_type_to_event_type_ids = instance.event_type_manager.get_many(
|
||||||
|
event_types, session
|
||||||
|
)
|
||||||
|
|
||||||
|
for event in events:
|
||||||
|
event_type = event.event_type
|
||||||
|
event.event_type = None
|
||||||
|
if event_type_id := event_type_to_event_type_ids.get(event_type):
|
||||||
|
event.event_type_id = event_type_id
|
||||||
|
continue
|
||||||
|
if event_type not in event_types_objects:
|
||||||
|
event_types_objects[event_type] = EventTypes(event_type=event_type)
|
||||||
|
event.event_type_rel = event_types_objects[event_type]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
|
||||||
async def test_purge_filtered_states(
|
async def test_purge_filtered_states(
|
||||||
async_setup_recorder_instance: RecorderInstanceGenerator,
|
async_setup_recorder_instance: RecorderInstanceGenerator,
|
||||||
@ -850,12 +878,24 @@ async def test_purge_filtered_states(
|
|||||||
)
|
)
|
||||||
assert states_sensor_excluded.count() == 0
|
assert states_sensor_excluded.count() == 0
|
||||||
|
|
||||||
assert session.query(States).get(72).old_state_id is None
|
assert (
|
||||||
assert session.query(States).get(72).attributes_id == 71
|
session.query(States).filter(States.state_id == 72).first().old_state_id
|
||||||
assert session.query(States).get(73).old_state_id is None
|
is None
|
||||||
assert session.query(States).get(73).attributes_id == 71
|
)
|
||||||
|
assert (
|
||||||
|
session.query(States).filter(States.state_id == 72).first().attributes_id
|
||||||
|
== 71
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
session.query(States).filter(States.state_id == 73).first().old_state_id
|
||||||
|
is None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
session.query(States).filter(States.state_id == 73).first().attributes_id
|
||||||
|
== 71
|
||||||
|
)
|
||||||
|
|
||||||
final_keep_state = session.query(States).get(74)
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
||||||
assert final_keep_state.old_state_id == 62 # should have been kept
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
||||||
assert final_keep_state.attributes_id == 71
|
assert final_keep_state.attributes_id == 71
|
||||||
|
|
||||||
@ -867,7 +907,7 @@ async def test_purge_filtered_states(
|
|||||||
await async_wait_purge_done(hass)
|
await async_wait_purge_done(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
final_keep_state = session.query(States).get(74)
|
final_keep_state = session.query(States).filter(States.state_id == 74).first()
|
||||||
assert final_keep_state.old_state_id == 62 # should have been kept
|
assert final_keep_state.old_state_id == 62 # should have been kept
|
||||||
assert final_keep_state.attributes_id == 71
|
assert final_keep_state.attributes_id == 71
|
||||||
|
|
||||||
@ -1022,7 +1062,7 @@ async def test_purge_filtered_events(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Test filtered events are purged."""
|
"""Test filtered events are purged."""
|
||||||
config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}}
|
config: ConfigType = {"exclude": {"event_types": ["EVENT_PURGE"]}}
|
||||||
await async_setup_recorder_instance(hass, config)
|
instance = await async_setup_recorder_instance(hass, config)
|
||||||
|
|
||||||
def _add_db_entries(hass: HomeAssistant) -> None:
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
@ -1050,14 +1090,17 @@ async def test_purge_filtered_events(
|
|||||||
timestamp,
|
timestamp,
|
||||||
event_id,
|
event_id,
|
||||||
)
|
)
|
||||||
|
_convert_pending_events_to_event_types(instance, session)
|
||||||
|
|
||||||
service_data = {"keep_days": 10}
|
service_data = {"keep_days": 10}
|
||||||
_add_db_entries(hass)
|
_add_db_entries(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE")
|
events_purge = session.query(Events).filter(
|
||||||
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
||||||
|
)
|
||||||
events_keep = session.query(Events).filter(
|
events_keep = session.query(Events).filter(
|
||||||
Events.event_type == EVENT_STATE_CHANGED
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
||||||
)
|
)
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
|
|
||||||
@ -1073,9 +1116,11 @@ async def test_purge_filtered_events(
|
|||||||
await async_wait_purge_done(hass)
|
await async_wait_purge_done(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE")
|
events_purge = session.query(Events).filter(
|
||||||
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
||||||
|
)
|
||||||
events_keep = session.query(Events).filter(
|
events_keep = session.query(Events).filter(
|
||||||
Events.event_type == EVENT_STATE_CHANGED
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
||||||
)
|
)
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
assert events_purge.count() == 60
|
assert events_purge.count() == 60
|
||||||
@ -1094,9 +1139,11 @@ async def test_purge_filtered_events(
|
|||||||
await async_wait_purge_done(hass)
|
await async_wait_purge_done(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
events_purge = session.query(Events).filter(Events.event_type == "EVENT_PURGE")
|
events_purge = session.query(Events).filter(
|
||||||
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
|
||||||
|
)
|
||||||
events_keep = session.query(Events).filter(
|
events_keep = session.query(Events).filter(
|
||||||
Events.event_type == EVENT_STATE_CHANGED
|
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
|
||||||
)
|
)
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
assert events_purge.count() == 0
|
assert events_purge.count() == 0
|
||||||
@ -1109,10 +1156,18 @@ async def test_purge_filtered_events_state_changed(
|
|||||||
hass: HomeAssistant,
|
hass: HomeAssistant,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test filtered state_changed events are purged. This should also remove all states."""
|
"""Test filtered state_changed events are purged. This should also remove all states."""
|
||||||
config: ConfigType = {"exclude": {"event_types": [EVENT_STATE_CHANGED]}}
|
config: ConfigType = {
|
||||||
|
"exclude": {
|
||||||
|
"event_types": ["excluded_event"],
|
||||||
|
"entities": ["sensor.excluded", "sensor.old_format"],
|
||||||
|
}
|
||||||
|
}
|
||||||
instance = await async_setup_recorder_instance(hass, config)
|
instance = await async_setup_recorder_instance(hass, config)
|
||||||
# Assert entity_id is NOT excluded
|
# Assert entity_id is NOT excluded
|
||||||
assert instance.entity_filter("sensor.excluded") is True
|
assert instance.entity_filter("sensor.excluded") is False
|
||||||
|
assert instance.entity_filter("sensor.old_format") is False
|
||||||
|
assert instance.entity_filter("sensor.keep") is True
|
||||||
|
assert "excluded_event" in instance.exclude_event_types
|
||||||
|
|
||||||
def _add_db_entries(hass: HomeAssistant) -> None:
|
def _add_db_entries(hass: HomeAssistant) -> None:
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
@ -1167,34 +1222,56 @@ async def test_purge_filtered_events_state_changed(
|
|||||||
old_state_id=62, # keep
|
old_state_id=62, # keep
|
||||||
)
|
)
|
||||||
session.add_all((state_1, state_2, state_3))
|
session.add_all((state_1, state_2, state_3))
|
||||||
|
session.add(
|
||||||
|
Events(
|
||||||
|
event_id=231,
|
||||||
|
event_type="excluded_event",
|
||||||
|
event_data="{}",
|
||||||
|
origin="LOCAL",
|
||||||
|
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
session.add(
|
||||||
|
States(
|
||||||
|
entity_id="sensor.old_format",
|
||||||
|
state="remove",
|
||||||
|
attributes="{}",
|
||||||
|
last_changed_ts=dt_util.utc_to_timestamp(timestamp),
|
||||||
|
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_convert_pending_events_to_event_types(instance, session)
|
||||||
|
_convert_pending_states_to_meta(instance, session)
|
||||||
|
|
||||||
service_data = {"keep_days": 10, "apply_filter": True}
|
service_data = {"keep_days": 10, "apply_filter": True}
|
||||||
_add_db_entries(hass)
|
_add_db_entries(hass)
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
|
events_keep = session.query(Events).filter(
|
||||||
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
||||||
|
)
|
||||||
events_purge = session.query(Events).filter(
|
events_purge = session.query(Events).filter(
|
||||||
Events.event_type == EVENT_STATE_CHANGED
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
||||||
)
|
)
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
|
|
||||||
assert events_keep.count() == 10
|
assert events_keep.count() == 10
|
||||||
assert events_purge.count() == 60
|
assert events_purge.count() == 1
|
||||||
assert states.count() == 63
|
assert states.count() == 64
|
||||||
|
|
||||||
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
await async_recorder_block_till_done(hass)
|
for _ in range(4):
|
||||||
await async_wait_purge_done(hass)
|
await async_recorder_block_till_done(hass)
|
||||||
|
await async_wait_purge_done(hass)
|
||||||
await async_recorder_block_till_done(hass)
|
|
||||||
await async_wait_purge_done(hass)
|
|
||||||
|
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
|
events_keep = session.query(Events).filter(
|
||||||
|
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
|
||||||
|
)
|
||||||
events_purge = session.query(Events).filter(
|
events_purge = session.query(Events).filter(
|
||||||
Events.event_type == EVENT_STATE_CHANGED
|
Events.event_type_id.in_(select_event_type_ids(("excluded_event",)))
|
||||||
)
|
)
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
|
|
||||||
@ -1202,9 +1279,18 @@ async def test_purge_filtered_events_state_changed(
|
|||||||
assert events_purge.count() == 0
|
assert events_purge.count() == 0
|
||||||
assert states.count() == 3
|
assert states.count() == 3
|
||||||
|
|
||||||
assert session.query(States).get(61).old_state_id is None
|
assert (
|
||||||
assert session.query(States).get(62).old_state_id is None
|
session.query(States).filter(States.state_id == 61).first().old_state_id
|
||||||
assert session.query(States).get(63).old_state_id == 62 # should have been kept
|
is None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
session.query(States).filter(States.state_id == 62).first().old_state_id
|
||||||
|
is None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
session.query(States).filter(States.state_id == 63).first().old_state_id
|
||||||
|
== 62
|
||||||
|
) # should have been kept
|
||||||
|
|
||||||
|
|
||||||
async def test_purge_entities(
|
async def test_purge_entities(
|
||||||
@ -1330,7 +1416,7 @@ async def test_purge_entities(
|
|||||||
|
|
||||||
_add_purge_records(hass)
|
_add_purge_records(hass)
|
||||||
|
|
||||||
# Confirm calling service without arguments matches all records (default filter behaviour)
|
# Confirm calling service without arguments matches all records (default filter behavior)
|
||||||
with session_scope(hass=hass) as session:
|
with session_scope(hass=hass) as session:
|
||||||
states = session.query(States)
|
states = session.query(States)
|
||||||
assert states.count() == 190
|
assert states.count() == 190
|
||||||
|
Loading…
x
Reference in New Issue
Block a user