Reduce overhead of legacy database columns on new installs (#90246)

* Reduce overhead of legacy database columns on new installs

* Reduce overhead of legacy database columns on new installs

* Reduce overhead of legacy database columns on new installs

* Reduce overhead of legacy database columns on new installs

* not working as expected

* override the type compiler

* override the type compiler

* override the type compiler

* override the type compiler

* Apply suggestions from code review

* pgsql char1

* make entity filter test setup with old schema

* fix some more tests that were mutating state

* fix some more tests that were mutating state

* fix some more tests that were mutating state

* fix more dbstate mutations

* add shim for older tests

* split migration tests

* add coverage for purging legacy data

* tweak

* more fixes

* drop some legacy

* fix another test

* fix a few more

* add casts for postgresql in case someone deletes the schema changes table

* dry

* dry

* dry
This commit is contained in:
J. Nick Koston 2023-04-10 04:08:46 -10:00 committed by GitHub
parent 14b95ffe3a
commit 49079691d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 2415 additions and 1254 deletions

View File

@ -16,6 +16,7 @@ from typing import Any, TypeVar
import async_timeout
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.engine import Engine
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.orm.session import Session
@ -1293,25 +1294,25 @@ class Recorder(threading.Thread):
return success
def _setup_recorder_connection(
self, dbapi_connection: DBAPIConnection, connection_record: Any
) -> None:
"""Dbapi specific connection settings."""
assert self.engine is not None
if database_engine := setup_connection_for_dialect(
self,
self.engine.dialect.name,
dbapi_connection,
not self._completed_first_database_setup,
):
self.database_engine = database_engine
self._completed_first_database_setup = True
def _setup_connection(self) -> None:
"""Ensure database is ready to fly."""
kwargs: dict[str, Any] = {}
self._completed_first_database_setup = False
def setup_recorder_connection(
dbapi_connection: Any, connection_record: Any
) -> None:
"""Dbapi specific connection settings."""
assert self.engine is not None
if database_engine := setup_connection_for_dialect(
self,
self.engine.dialect.name,
dbapi_connection,
not self._completed_first_database_setup,
):
self.database_engine = database_engine
self._completed_first_database_setup = True
if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url:
kwargs["connect_args"] = {"check_same_thread": False}
kwargs["poolclass"] = MutexPool
@ -1346,7 +1347,7 @@ class Recorder(threading.Thread):
self.engine = create_engine(self.db_url, **kwargs, future=True)
self._dialect_name = try_parse_enum(SupportedDialect, self.engine.dialect.name)
sqlalchemy_event.listen(self.engine, "connect", setup_recorder_connection)
sqlalchemy_event.listen(self.engine, "connect", self._setup_recorder_connection)
Base.metadata.create_all(self.engine)
self._get_session = scoped_session(sessionmaker(bind=self.engine, future=True))

View File

@ -10,6 +10,7 @@ from typing import Any, cast
import ciso8601
from fnv_hash_fast import fnv1a_32
from sqlalchemy import (
CHAR,
JSON,
BigInteger,
Boolean,
@ -28,13 +29,13 @@ from sqlalchemy import (
)
from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeBase, Mapped, aliased, mapped_column, relationship
from sqlalchemy.types import TypeDecorator
from typing_extensions import Self
from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE,
MAX_LENGTH_EVENT_ORIGIN,
MAX_LENGTH_STATE_ENTITY_ID,
MAX_LENGTH_STATE_STATE,
)
@ -135,6 +136,28 @@ _DEFAULT_TABLE_ARGS = {
}
class UnusedDateTime(DateTime):
"""An unused column type that behaves like a datetime."""
class Unused(CHAR):
"""An unused column type that behaves like a string."""
@compiles(UnusedDateTime, "mysql", "mariadb", "sqlite") # type: ignore[misc,no-untyped-call]
@compiles(Unused, "mysql", "mariadb", "sqlite") # type: ignore[misc,no-untyped-call]
def compile_char_zero(type_: TypeDecorator, compiler: Any, **kw: Any) -> str:
"""Compile UnusedDateTime and Unused as CHAR(0) on mysql, mariadb, and sqlite."""
return "CHAR(0)" # Uses 1 byte on MySQL (no change on sqlite)
@compiles(UnusedDateTime, "postgresql") # type: ignore[misc,no-untyped-call]
@compiles(Unused, "postgresql") # type: ignore[misc,no-untyped-call]
def compile_char_one(type_: TypeDecorator, compiler: Any, **kw: Any) -> str:
"""Compile UnusedDateTime and Unused as CHAR(1) on postgresql."""
return "CHAR(1)" # Uses 1 byte
class FAST_PYSQLITE_DATETIME(sqlite.DATETIME):
"""Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex."""
@ -165,6 +188,8 @@ DOUBLE_TYPE = (
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
UNUSED_LEGACY_COLUMN = Unused(0)
UNUSED_LEGACY_DATETIME_COLUMN = UnusedDateTime(timezone=True)
DOUBLE_PRECISION_TYPE_SQL = "DOUBLE PRECISION"
TIMESTAMP_TYPE = DOUBLE_TYPE
@ -206,29 +231,15 @@ class Events(Base):
)
__tablename__ = TABLE_EVENTS
event_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
event_type: Mapped[str | None] = mapped_column(
String(MAX_LENGTH_EVENT_EVENT_TYPE)
) # no longer used
event_data: Mapped[str | None] = mapped_column(
Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
)
origin: Mapped[str | None] = mapped_column(
String(MAX_LENGTH_EVENT_ORIGIN)
) # no longer used for new rows
event_type: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
event_data: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
origin: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
origin_idx: Mapped[int | None] = mapped_column(SmallInteger)
time_fired: Mapped[datetime | None] = mapped_column(
DATETIME_TYPE
) # no longer used for new rows
time_fired: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
time_fired_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
context_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
context_user_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_parent_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
data_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("event_data.data_id"), index=True
)
@ -400,21 +411,13 @@ class States(Base):
)
__tablename__ = TABLE_STATES
state_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
entity_id: Mapped[str | None] = mapped_column(
String(MAX_LENGTH_STATE_ENTITY_ID)
) # no longer used for new rows
entity_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
state: Mapped[str | None] = mapped_column(String(MAX_LENGTH_STATE_STATE))
attributes: Mapped[str | None] = mapped_column(
Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
) # no longer used for new rows
event_id: Mapped[int | None] = mapped_column(Integer) # no longer used for new rows
last_changed: Mapped[datetime | None] = mapped_column(
DATETIME_TYPE
) # no longer used for new rows
attributes: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
event_id: Mapped[int | None] = mapped_column(UNUSED_LEGACY_COLUMN)
last_changed: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
last_changed_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
last_updated: Mapped[datetime | None] = mapped_column(
DATETIME_TYPE
) # no longer used for new rows
last_updated: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
last_updated_ts: Mapped[float | None] = mapped_column(
TIMESTAMP_TYPE, default=time.time, index=True
)
@ -424,15 +427,9 @@ class States(Base):
attributes_id: Mapped[int | None] = mapped_column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
context_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True
)
context_user_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_parent_id: Mapped[str | None] = mapped_column( # no longer used
String(MAX_LENGTH_EVENT_CONTEXT_ID)
)
context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
origin_idx: Mapped[int | None] = mapped_column(
SmallInteger
) # 0 is local, 1 is remote
@ -636,20 +633,18 @@ class StatisticsBase:
"""Statistics base class."""
id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True)
created: Mapped[datetime | None] = mapped_column(DATETIME_TYPE) # No longer used
created: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
created_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, default=time.time)
metadata_id: Mapped[int | None] = mapped_column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
)
start: Mapped[datetime | None] = mapped_column(
DATETIME_TYPE, index=True
) # No longer used
start: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
start_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
mean: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
min: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
max: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
last_reset: Mapped[datetime | None] = mapped_column(DATETIME_TYPE)
last_reset: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
last_reset_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
state: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
sum: Mapped[float | None] = mapped_column(DOUBLE_TYPE)

View File

@ -1264,7 +1264,7 @@ def _migrate_columns_to_timestamp(
text(
"UPDATE events SET "
"time_fired_ts= "
"(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired) end) "
"(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired::timestamptz) end) "
"WHERE event_id IN ( "
"SELECT event_id FROM events where time_fired_ts is NULL LIMIT 250000 "
" );"
@ -1276,8 +1276,8 @@ def _migrate_columns_to_timestamp(
result = session.connection().execute(
text(
"UPDATE states set last_updated_ts="
"(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated) end), "
"last_changed_ts=EXTRACT(EPOCH FROM last_changed) "
"(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated::timestamptz) end), "
"last_changed_ts=EXTRACT(EPOCH FROM last_changed::timestamptz) "
"where state_id IN ( "
"SELECT state_id FROM states where last_updated_ts is NULL LIMIT 250000 "
" );"
@ -1344,12 +1344,12 @@ def _migrate_statistics_columns_to_timestamp(
result = session.connection().execute(
text(
f"UPDATE {table} set start_ts=" # nosec
"(case when start is NULL then 0 else EXTRACT(EPOCH FROM start) end), "
"created_ts=EXTRACT(EPOCH FROM created), "
"last_reset_ts=EXTRACT(EPOCH FROM last_reset) "
"where id IN ( "
f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000 "
" );"
"(case when start is NULL then 0 else EXTRACT(EPOCH FROM start::timestamptz) end), "
"created_ts=EXTRACT(EPOCH FROM created::timestamptz), "
"last_reset_ts=EXTRACT(EPOCH FROM last_reset::timestamptz) "
"where id IN ("
f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000"
");"
)
)

View File

@ -20,6 +20,7 @@ from awesomeversion import (
import ciso8601
from sqlalchemy import inspect, text
from sqlalchemy.engine import Result, Row
from sqlalchemy.engine.interfaces import DBAPIConnection
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
@ -344,14 +345,14 @@ def move_away_broken_database(dbfile: str) -> None:
os.rename(path, f"{path}{corrupt_postfix}")
def execute_on_connection(dbapi_connection: Any, statement: str) -> None:
def execute_on_connection(dbapi_connection: DBAPIConnection, statement: str) -> None:
"""Execute a single statement with a dbapi connection."""
cursor = dbapi_connection.cursor()
cursor.execute(statement)
cursor.close()
def query_on_connection(dbapi_connection: Any, statement: str) -> Any:
def query_on_connection(dbapi_connection: DBAPIConnection, statement: str) -> Any:
"""Execute a single statement with a dbapi connection and return the result."""
cursor = dbapi_connection.cursor()
cursor.execute(statement)
@ -457,7 +458,7 @@ def _async_create_mariadb_range_index_regression_issue(
def setup_connection_for_dialect(
instance: Recorder,
dialect_name: str,
dbapi_connection: Any,
dbapi_connection: DBAPIConnection,
first_connection: bool,
) -> DatabaseEngine | None:
"""Execute statements needed for dialect connection."""
@ -465,10 +466,10 @@ def setup_connection_for_dialect(
slow_range_in_select = False
if dialect_name == SupportedDialect.SQLITE:
if first_connection:
old_isolation = dbapi_connection.isolation_level
dbapi_connection.isolation_level = None
old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined]
dbapi_connection.isolation_level = None # type: ignore[attr-defined]
execute_on_connection(dbapi_connection, "PRAGMA journal_mode=WAL")
dbapi_connection.isolation_level = old_isolation
dbapi_connection.isolation_level = old_isolation # type: ignore[attr-defined]
# WAL mode only needs to be setup once
# instead of every time we open the sqlite connection
# as its persistent and isn't free to call every time.

View File

@ -2,9 +2,13 @@
from __future__ import annotations
import asyncio
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import partial
import importlib
import sys
import time
from typing import Any, Literal, cast
from unittest.mock import patch, sentinel
@ -14,8 +18,14 @@ from sqlalchemy.orm.session import Session
from homeassistant import core as ha
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder, get_instance, statistics
from homeassistant.components.recorder.db_schema import RecorderRuns
from homeassistant.components.recorder import Recorder, core, get_instance, statistics
from homeassistant.components.recorder.db_schema import (
Events,
EventTypes,
RecorderRuns,
States,
StatesMeta,
)
from homeassistant.components.recorder.tasks import RecorderTask, StatisticsTask
from homeassistant.const import UnitOfTemperature
from homeassistant.core import Event, HomeAssistant, State
@ -24,6 +34,7 @@ import homeassistant.util.dt as dt_util
from . import db_schema_0
DEFAULT_PURGE_TASKS = 3
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
@dataclass
@ -307,3 +318,118 @@ def record_states(hass):
states[sns4].append(set_state(sns4, "20", attributes=sns4_attr))
return zero, four, states
def convert_pending_states_to_meta(instance: Recorder, session: Session) -> None:
"""Convert pending states to use states_metadata."""
entity_ids: set[str] = set()
states: set[States] = set()
states_meta_objects: dict[str, StatesMeta] = {}
for object in session:
if isinstance(object, States):
entity_ids.add(object.entity_id)
states.add(object)
entity_id_to_metadata_ids = instance.states_meta_manager.get_many(
entity_ids, session, True
)
for state in states:
entity_id = state.entity_id
state.entity_id = None
state.attributes = None
state.event_id = None
if metadata_id := entity_id_to_metadata_ids.get(entity_id):
state.metadata_id = metadata_id
continue
if entity_id not in states_meta_objects:
states_meta_objects[entity_id] = StatesMeta(entity_id=entity_id)
state.states_meta_rel = states_meta_objects[entity_id]
def convert_pending_events_to_event_types(instance: Recorder, session: Session) -> None:
"""Convert pending events to use event_type_ids."""
event_types: set[str] = set()
events: set[Events] = set()
event_types_objects: dict[str, EventTypes] = {}
for object in session:
if isinstance(object, Events):
event_types.add(object.event_type)
events.add(object)
event_type_to_event_type_ids = instance.event_type_manager.get_many(
event_types, session
)
for event in events:
event_type = event.event_type
event.event_type = None
event.event_data = None
event.origin = None
if event_type_id := event_type_to_event_type_ids.get(event_type):
event.event_type_id = event_type_id
continue
if event_type not in event_types_objects:
event_types_objects[event_type] = EventTypes(event_type=event_type)
event.event_type_rel = event_types_objects[event_type]
def create_engine_test_for_schema_version_postfix(
*args, schema_version_postfix: str, **kwargs
):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
schema_module = get_schema_module_path(schema_version_postfix)
importlib.import_module(schema_module)
old_db_schema = sys.modules[schema_module]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
def get_schema_module_path(schema_version_postfix: str) -> str:
"""Return the path to the schema module."""
return f"tests.components.recorder.db_schema_{schema_version_postfix}"
@contextmanager
def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
"""Fixture to initialize the db with the old schema."""
schema_module = get_schema_module_path(schema_version_postfix)
importlib.import_module(schema_module)
old_db_schema = sys.modules[schema_module]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
core, "EventTypes", old_db_schema.EventTypes
), patch.object(
core, "EventData", old_db_schema.EventData
), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch.object(
core, "EntityIDMigrationTask", core.RecorderTask
), patch(
CREATE_ENGINE_TARGET,
new=partial(
create_engine_test_for_schema_version_postfix,
schema_version_postfix=schema_version_postfix,
),
):
yield

View File

@ -562,16 +562,19 @@ class StatisticsBase:
id = Column(Integer, Identity(), primary_key=True)
created = Column(DATETIME_TYPE, default=dt_util.utcnow)
created_ts = Column(TIMESTAMP_TYPE, default=time.time)
metadata_id = Column(
Integer,
ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
index=True,
)
start = Column(DATETIME_TYPE, index=True)
start_ts = Column(TIMESTAMP_TYPE, index=True)
mean = Column(DOUBLE_TYPE)
min = Column(DOUBLE_TYPE)
max = Column(DOUBLE_TYPE)
last_reset = Column(DATETIME_TYPE)
last_reset_ts = Column(TIMESTAMP_TYPE)
state = Column(DOUBLE_TYPE)
sum = Column(DOUBLE_TYPE)

View File

@ -1,4 +1,5 @@
"""The tests for the recorder filter matching the EntityFilter component."""
# pylint: disable=invalid-name
import json
from unittest.mock import patch
@ -25,7 +26,15 @@ from homeassistant.helpers.entityfilter import (
convert_include_exclude_filter,
)
from .common import async_wait_recording_done
from .common import async_wait_recording_done, old_db_schema
# This test is for schema 37 and below (32 is new enough to test)
@pytest.fixture(autouse=True)
def db_schema_32():
"""Fixture to initialize the db with the old schema 32."""
with old_db_schema("32"):
yield
@pytest.fixture(name="legacy_recorder_mock")

View File

@ -59,7 +59,7 @@ async def _async_get_states(
klass = LazyStatePreSchema31
else:
klass = LazyState
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
attr_cache = {}
return [
klass(row, attr_cache, None)
@ -157,7 +157,7 @@ def test_get_full_significant_states_with_session_entity_no_matches(
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
assert (
history.get_full_significant_states_with_session(
hass, session, time_before_recorder_ran, now, entity_ids=["demo.id"]
@ -183,7 +183,7 @@ def test_significant_states_with_session_entity_minimal_response_no_matches(
hass = hass_recorder()
now = dt_util.utcnow()
time_before_recorder_ran = now - timedelta(days=1000)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
assert (
history.get_significant_states_with_session(
hass,
@ -217,7 +217,7 @@ def test_significant_states_with_session_single_entity(
hass.states.set("demo.id", "any2", {"attr": True})
wait_recording_done(hass)
now = dt_util.utcnow()
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = history.get_significant_states_with_session(
hass,
session,
@ -1031,7 +1031,7 @@ async def test_get_full_significant_states_handles_empty_last_changed(
await async_wait_recording_done(hass)
def _get_entries():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
return history.get_full_significant_states_with_session(
hass,
session,
@ -1049,7 +1049,7 @@ async def test_get_full_significant_states_handles_empty_last_changed(
assert sensor_one_states[0].last_updated != sensor_one_states[1].last_updated
def _fetch_native_states() -> list[State]:
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
native_states = []
db_state_attributes = {
state_attributes.attributes_id: state_attributes
@ -1085,7 +1085,7 @@ async def test_get_full_significant_states_handles_empty_last_changed(
)
def _fetch_db_states() -> list[States]:
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = list(session.query(States))
session.expunge_all()
return states
@ -1151,7 +1151,7 @@ async def test_get_full_significant_states_past_year_2038(
await async_wait_recording_done(hass)
def _get_entries():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
return history.get_full_significant_states_with_session(
hass,
session,

View File

@ -6,17 +6,13 @@ from collections.abc import Callable
# pylint: disable=invalid-name
from copy import copy
from datetime import datetime, timedelta
import importlib
import json
import sys
from unittest.mock import patch, sentinel
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import core, history, statistics
from homeassistant.components.recorder import history
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.util import session_scope
@ -29,58 +25,15 @@ from .common import (
assert_multiple_states_equal_without_context,
assert_multiple_states_equal_without_context_and_last_changed,
assert_states_equal_without_context,
old_db_schema,
wait_recording_done,
)
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_30"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@pytest.fixture(autouse=True)
def db_schema_30():
"""Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
core, "EventTypes", old_db_schema.EventTypes
), patch.object(
core, "EventData", old_db_schema.EventData
), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch.object(
core, "EntityIDMigrationTask", core.RecorderTask
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
"""Fixture to initialize the db with the old schema 30."""
with old_db_schema("30"):
yield

View File

@ -6,17 +6,13 @@ from collections.abc import Callable
# pylint: disable=invalid-name
from copy import copy
from datetime import datetime, timedelta
import importlib
import json
import sys
from unittest.mock import patch, sentinel
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import core, history, statistics
from homeassistant.components.recorder import history
from homeassistant.components.recorder.filters import Filters
from homeassistant.components.recorder.models import process_timestamp
from homeassistant.components.recorder.util import session_scope
@ -29,58 +25,15 @@ from .common import (
assert_multiple_states_equal_without_context,
assert_multiple_states_equal_without_context_and_last_changed,
assert_states_equal_without_context,
old_db_schema,
wait_recording_done,
)
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_32"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@pytest.fixture(autouse=True)
def db_schema_32():
"""Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
core, "EventTypes", old_db_schema.EventTypes
), patch.object(
core, "EventData", old_db_schema.EventData
), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch.object(
core, "EntityIDMigrationTask", core.RecorderTask
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
"""Fixture to initialize the db with the old schema 32."""
with old_db_schema("32"):
yield

View File

@ -75,6 +75,7 @@ from homeassistant.util.json import json_loads
from .common import (
async_block_recorder,
async_wait_recording_done,
convert_pending_states_to_meta,
corrupt_db_file,
run_information_with_session,
wait_recording_done,
@ -187,7 +188,7 @@ async def test_shutdown_closes_connections(
pool.shutdown = Mock()
def _ensure_connected():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
list(session.query(States))
await instance.async_add_executor_job(_ensure_connected)
@ -221,7 +222,7 @@ async def test_state_gets_saved_when_set_before_start_event(
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
@ -237,7 +238,7 @@ async def test_saving_state(recorder_mock: Recorder, hass: HomeAssistant) -> Non
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
@ -278,7 +279,7 @@ async def test_saving_state_with_nul(
hass.states.async_set(entity_id, state, attributes)
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
@ -321,7 +322,7 @@ async def test_saving_many_states(
assert expire_all.called
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 6
assert db_states[0].event_id is None
@ -345,7 +346,7 @@ async def test_saving_state_with_intermixed_time_changes(
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 2
assert db_states[0].event_id is None
@ -385,7 +386,7 @@ def test_saving_state_with_exception(
hass.states.set(entity_id, state, attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) >= 1
@ -426,7 +427,7 @@ def test_saving_state_with_sqlalchemy_exception(
hass.states.set(entity_id, state, attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) >= 1
@ -494,7 +495,7 @@ def test_saving_event(hass_recorder: Callable[..., HomeAssistant]) -> None:
get_instance(hass).block_till_done()
events: list[Event] = []
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
@ -537,7 +538,7 @@ def test_saving_state_with_commit_interval_zero(
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
@ -563,6 +564,7 @@ def _add_entities(hass, entity_ids):
native_state = db_state.to_native()
native_state.attributes = db_state_attributes.to_native()
states.append(native_state)
convert_pending_states_to_meta(get_instance(hass), session)
return states
@ -647,7 +649,7 @@ async def test_saving_event_exclude_event_type(
await async_wait_recording_done(hass)
def _get_events(hass: HomeAssistant, event_types: list[str]) -> list[Event]:
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
events = []
for event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
@ -777,7 +779,7 @@ def test_saving_state_and_removing_entity(
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(StatesMeta.entity_id, States.state)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
@ -804,7 +806,7 @@ def test_saving_state_with_oversized_attributes(
wait_recording_done(hass)
states = []
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)
.outerjoin(
@ -838,7 +840,7 @@ def test_saving_event_with_oversized_data(
wait_recording_done(hass)
events = {}
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
for _, data, event_type in (
session.query(Events.event_id, EventData.shared_data, EventTypes.event_type)
.outerjoin(EventData, Events.data_id == EventData.data_id)
@ -864,7 +866,7 @@ def test_saving_event_invalid_context_ulid(
wait_recording_done(hass)
events = {}
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
for _, data, event_type in (
session.query(Events.event_id, EventData.shared_data, EventTypes.event_type)
.outerjoin(EventData, Events.data_id == EventData.data_id)
@ -1267,7 +1269,7 @@ def test_statistics_runs_initiated(hass_recorder: Callable[..., HomeAssistant])
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
@ -1292,7 +1294,7 @@ def test_compile_missing_statistics(
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 1
last_run = process_timestamp(statistics_runs[0].start)
@ -1330,7 +1332,7 @@ def test_compile_missing_statistics(
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
statistics_runs = list(session.query(StatisticsRuns))
assert len(statistics_runs) == 13 # 12 5-minute runs
last_run = process_timestamp(statistics_runs[1].start)
@ -1355,7 +1357,7 @@ def test_saving_sets_old_state(hass_recorder: Callable[..., HomeAssistant]) -> N
hass.states.set("test.two", "s4", {})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(
StatesMeta.entity_id, States.state_id, States.old_state_id, States.state
@ -1389,7 +1391,7 @@ def test_saving_state_with_serializable_data(
hass.states.set("test.two", "s3", {})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(
StatesMeta.entity_id, States.state_id, States.old_state_id, States.state
@ -1447,7 +1449,7 @@ def test_service_disable_events_not_recording(
assert len(events) == 1
event = events[0]
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
@ -1471,7 +1473,7 @@ def test_service_disable_events_not_recording(
assert events[0].data != events[1].data
db_events = []
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
for select_event, event_data, event_types in (
session.query(Events, EventData, EventTypes)
.filter(Events.event_type_id.in_(select_event_type_ids((event_type,))))
@ -1515,7 +1517,7 @@ def test_service_disable_states_not_recording(
hass.states.set("test.one", "on", {})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
assert len(list(session.query(States))) == 0
assert hass.services.call(
@ -1528,7 +1530,7 @@ def test_service_disable_states_not_recording(
hass.states.set("test.two", "off", {})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id is None
@ -1550,7 +1552,7 @@ def test_service_disable_run_information_recorded(tmpdir: py.path.local) -> None
hass.start()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_run_info = list(session.query(RecorderRuns))
assert len(db_run_info) == 1
assert db_run_info[0].start is not None
@ -1572,7 +1574,7 @@ def test_service_disable_run_information_recorded(tmpdir: py.path.local) -> None
hass.start()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_run_info = list(session.query(RecorderRuns))
assert len(db_run_info) == 2
assert db_run_info[0].start is not None
@ -1642,7 +1644,7 @@ async def test_database_corruption_while_running(
await async_wait_recording_done(hass)
def _get_last_state():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
db_states[0].entity_id = "test.two"
@ -1680,7 +1682,7 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None:
hass.bus.fire("hello", data)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
@ -1695,7 +1697,7 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None:
hass.bus.fire("hello", data)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_events = list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
@ -1729,7 +1731,7 @@ async def test_database_lock_and_unlock(
event_types = (event_type,)
def _get_db_events():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
@ -1784,7 +1786,7 @@ async def test_database_lock_and_overflow(
event_types = (event_type,)
def _get_db_events():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
return list(
session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(event_types))
@ -1919,7 +1921,7 @@ def test_deduplication_event_data_inside_commit_interval(
hass.bus.fire("this_event", {"de": "dupe"})
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
event_types = ("this_event",)
events = list(
session.query(Events)
@ -1960,7 +1962,7 @@ def test_deduplication_state_attributes_inside_commit_interval(
hass.states.set(entity_id, "off", attributes)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = list(
session.query(States).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
@ -1986,7 +1988,7 @@ async def test_async_block_till_done(
hass.states.async_set(entity_id, "off", attributes)
def _fetch_states():
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
return list(session.query(States))
await async_block_recorder(hass, 0.1)
@ -2194,7 +2196,7 @@ async def test_excluding_attributes_by_integration(
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = []
for db_state, db_state_attributes, states_meta in (
session.query(States, StateAttributes, StatesMeta)

View File

@ -6,10 +6,9 @@ import sqlite3
import sys
import threading
from unittest.mock import Mock, PropertyMock, call, patch
import uuid
import pytest
from sqlalchemy import create_engine, inspect, text
from sqlalchemy import create_engine, text
from sqlalchemy.exc import (
DatabaseError,
InternalError,
@ -25,40 +24,23 @@ from homeassistant.components import persistent_notification as pn, recorder
from homeassistant.components.recorder import db_schema, migration
from homeassistant.components.recorder.db_schema import (
SCHEMA_VERSION,
Events,
EventTypes,
RecorderRuns,
States,
StatesMeta,
)
from homeassistant.components.recorder.queries import select_event_type_ids
from homeassistant.components.recorder.tasks import (
EntityIDMigrationTask,
EntityIDPostMigrationTask,
EventsContextIDMigrationTask,
EventTypeIDMigrationTask,
StatesContextIDMigrationTask,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant
from homeassistant.helpers import recorder as recorder_helper
import homeassistant.util.dt as dt_util
from homeassistant.util.ulid import bytes_to_ulid
from .common import (
async_recorder_block_till_done,
async_wait_recording_done,
create_engine_test,
)
from .common import async_wait_recording_done, create_engine_test
from tests.common import async_fire_time_changed
from tests.typing import RecorderInstanceGenerator
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
def _get_native_states(hass, entity_id):
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
instance = recorder.get_instance(hass)
metadata_id = instance.states_meta_manager.get(entity_id, session, True)
states = []
@ -364,7 +346,7 @@ async def test_schema_migrate(
# Check and report the outcome of the migration; if migration fails
# the recorder will silently create a new database.
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
res = (
session.query(db_schema.SchemaChanges)
.order_by(db_schema.SchemaChanges.change_id.desc())
@ -601,654 +583,3 @@ def test_raise_if_exception_missing_empty_cause_str() -> None:
with pytest.raises(ProgrammingError):
migration.raise_if_exception_missing_str(programming_exc, ["not present"])
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migrate_events_context_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex
uuid_bin = test_uuid.bytes
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
Events(
event_type="old_uuid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.452529,
context_id=uuid_hex,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="empty_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id=None,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="ulid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
context_id_bin=None,
context_user_id="9400facee45711eaa9308bfd3d19e474",
context_user_id_bin=None,
context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2",
context_parent_id_bin=None,
),
Events(
event_type="invalid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="invalid",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="garbage_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="adapt_lgt:b'5Cf*':interval:b'0R'",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventsContextIDMigrationTask())
await async_recorder_block_till_done(hass)
def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events)
.filter(
Events.event_type.in_(
[
"old_uuid_context_id_event",
"empty_context_id_event",
"ulid_context_id_event",
"invalid_context_id_event",
"garbage_context_id_event",
]
)
)
.all()
)
assert len(events) == 5
return {event.event_type: _object_as_dict(event) for event in events}
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None
assert old_uuid_context_id_event["context_user_id"] is None
assert old_uuid_context_id_event["context_parent_id"] is None
assert old_uuid_context_id_event["context_id_bin"] == uuid_bin
assert old_uuid_context_id_event["context_user_id_bin"] is None
assert old_uuid_context_id_event["context_parent_id_bin"] is None
empty_context_id_event = events_by_type["empty_context_id_event"]
assert empty_context_id_event["context_id"] is None
assert empty_context_id_event["context_user_id"] is None
assert empty_context_id_event["context_parent_id"] is None
assert empty_context_id_event["context_id_bin"] == b"\x00" * 16
assert empty_context_id_event["context_user_id_bin"] is None
assert empty_context_id_event["context_parent_id_bin"] is None
ulid_context_id_event = events_by_type["ulid_context_id_event"]
assert ulid_context_id_event["context_id"] is None
assert ulid_context_id_event["context_user_id"] is None
assert ulid_context_id_event["context_parent_id"] is None
assert (
bytes_to_ulid(ulid_context_id_event["context_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FAV"
)
assert (
ulid_context_id_event["context_user_id_bin"]
== b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t"
)
assert (
bytes_to_ulid(ulid_context_id_event["context_parent_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FA2"
)
invalid_context_id_event = events_by_type["invalid_context_id_event"]
assert invalid_context_id_event["context_id"] is None
assert invalid_context_id_event["context_user_id"] is None
assert invalid_context_id_event["context_parent_id"] is None
assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16
assert invalid_context_id_event["context_user_id_bin"] is None
assert invalid_context_id_event["context_parent_id_bin"] is None
garbage_context_id_event = events_by_type["garbage_context_id_event"]
assert garbage_context_id_event["context_id"] is None
assert garbage_context_id_event["context_user_id"] is None
assert garbage_context_id_event["context_parent_id"] is None
assert garbage_context_id_event["context_id_bin"] == b"\x00" * 16
assert garbage_context_id_event["context_user_id_bin"] is None
assert garbage_context_id_event["context_parent_id_bin"] is None
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migrate_states_context_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex
uuid_bin = test_uuid.bytes
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="state.old_uuid_context_id",
last_updated_ts=1677721632.452529,
context_id=uuid_hex,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.empty_context_id",
last_updated_ts=1677721632.552529,
context_id=None,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.ulid_context_id",
last_updated_ts=1677721632.552529,
context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
context_id_bin=None,
context_user_id="9400facee45711eaa9308bfd3d19e474",
context_user_id_bin=None,
context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2",
context_parent_id_bin=None,
),
States(
entity_id="state.invalid_context_id",
last_updated_ts=1677721632.552529,
context_id="invalid",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.garbage_context_id",
last_updated_ts=1677721632.552529,
context_id="adapt_lgt:b'5Cf*':interval:b'0R'",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(StatesContextIDMigrationTask())
await async_recorder_block_till_done(hass)
def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
events = (
session.query(States)
.filter(
States.entity_id.in_(
[
"state.old_uuid_context_id",
"state.empty_context_id",
"state.ulid_context_id",
"state.invalid_context_id",
"state.garbage_context_id",
]
)
)
.all()
)
assert len(events) == 5
return {state.entity_id: _object_as_dict(state) for state in events}
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"]
assert old_uuid_context_id["context_id"] is None
assert old_uuid_context_id["context_user_id"] is None
assert old_uuid_context_id["context_parent_id"] is None
assert old_uuid_context_id["context_id_bin"] == uuid_bin
assert old_uuid_context_id["context_user_id_bin"] is None
assert old_uuid_context_id["context_parent_id_bin"] is None
empty_context_id = states_by_entity_id["state.empty_context_id"]
assert empty_context_id["context_id"] is None
assert empty_context_id["context_user_id"] is None
assert empty_context_id["context_parent_id"] is None
assert empty_context_id["context_id_bin"] == b"\x00" * 16
assert empty_context_id["context_user_id_bin"] is None
assert empty_context_id["context_parent_id_bin"] is None
ulid_context_id = states_by_entity_id["state.ulid_context_id"]
assert ulid_context_id["context_id"] is None
assert ulid_context_id["context_user_id"] is None
assert ulid_context_id["context_parent_id"] is None
assert (
bytes_to_ulid(ulid_context_id["context_id_bin"]) == "01ARZ3NDEKTSV4RRFFQ69G5FAV"
)
assert (
ulid_context_id["context_user_id_bin"]
== b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t"
)
assert (
bytes_to_ulid(ulid_context_id["context_parent_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FA2"
)
invalid_context_id = states_by_entity_id["state.invalid_context_id"]
assert invalid_context_id["context_id"] is None
assert invalid_context_id["context_user_id"] is None
assert invalid_context_id["context_parent_id"] is None
assert invalid_context_id["context_id_bin"] == b"\x00" * 16
assert invalid_context_id["context_user_id_bin"] is None
assert invalid_context_id["context_parent_id_bin"] is None
garbage_context_id = states_by_entity_id["state.garbage_context_id"]
assert garbage_context_id["context_id"] is None
assert garbage_context_id["context_user_id"] is None
assert garbage_context_id["context_parent_id"] is None
assert garbage_context_id["context_id_bin"] == b"\x00" * 16
assert garbage_context_id["context_user_id_bin"] is None
assert garbage_context_id["context_parent_id_bin"] is None
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
async def test_migrate_event_type_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate event_types to the EventTypes table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1677721632.452529,
),
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1677721632.552529,
),
Events(
event_type="event_type_two",
origin_idx=0,
time_fired_ts=1677721632.552529,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventTypeIDMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
.filter(
Events.event_type_id.in_(
select_event_type_ids(
(
"event_type_one",
"event_type_two",
)
)
)
)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.all()
)
assert len(events) == 3
result = {}
for event in events:
result.setdefault(event.event_type, []).append(
{
"event_id": event.event_id,
"time_fired": event.time_fired,
"event_type": event.event_type,
}
)
return result
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type["event_type_two"]) == 1
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_migrate_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
States(
entity_id="sensor.two",
state="two_2",
last_updated_ts=2.252529,
),
States(
entity_id="sensor.two",
state="two_1",
last_updated_ts=3.152529,
),
)
)
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
states = (
session.query(
States.state,
States.metadata_id,
States.last_updated_ts,
StatesMeta.entity_id,
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.all()
)
assert len(states) == 3
result = {}
for state in states:
result.setdefault(state.entity_id, []).append(
{
"state_id": state.entity_id,
"last_updated_ts": state.last_updated_ts,
"state": state.state,
}
)
return result
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
assert len(states_by_entity_id["sensor.two"]) == 2
assert len(states_by_entity_id["sensor.one"]) == 1
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_post_migrate_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
States(
entity_id="sensor.two",
state="two_2",
last_updated_ts=2.252529,
),
States(
entity_id="sensor.two",
state="two_1",
last_updated_ts=3.152529,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDPostMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
states = session.query(
States.state,
States.entity_id,
).all()
assert len(states) == 3
return {state.state: state.entity_id for state in states}
states_by_state = await instance.async_add_executor_job(_fetch_migrated_states)
assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None
assert states_by_state["two_1"] is None
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_migrate_null_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
)
session.add_all(
States(
entity_id=None,
state="empty",
last_updated_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
states = (
session.query(
States.state,
States.metadata_id,
States.last_updated_ts,
StatesMeta.entity_id,
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.all()
)
assert len(states) == 1002
result = {}
for state in states:
result.setdefault(state.entity_id, []).append(
{
"state_id": state.entity_id,
"last_updated_ts": state.last_updated_ts,
"state": state.state,
}
)
return result
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
assert len(states_by_entity_id["sensor.one"]) == 2
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
async def test_migrate_null_event_type_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1.452529,
),
)
session.add_all(
Events(
event_type=None,
origin_idx=0,
time_fired_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventTypeIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
.filter(
Events.event_type_id.in_(
select_event_type_ids(
(
"event_type_one",
migration._EMPTY_EVENT_TYPE,
)
)
)
)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.all()
)
assert len(events) == 1002
result = {}
for event in events:
result.setdefault(event.event_type, []).append(
{
"event_id": event.event_id,
"time_fired": event.time_fired,
"event_type": event.event_type,
}
)
return result
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000

View File

@ -0,0 +1,745 @@
"""The tests for the recorder filter matching the EntityFilter component."""
# pylint: disable=invalid-name
import importlib
import sys
from unittest.mock import patch
import uuid
import pytest
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import core, migration, statistics
from homeassistant.components.recorder.db_schema import (
Events,
EventTypes,
States,
StatesMeta,
)
from homeassistant.components.recorder.queries import select_event_type_ids
from homeassistant.components.recorder.tasks import (
EntityIDMigrationTask,
EntityIDPostMigrationTask,
EventsContextIDMigrationTask,
EventTypeIDMigrationTask,
StatesContextIDMigrationTask,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from homeassistant.util.ulid import bytes_to_ulid
from .common import async_recorder_block_till_done, async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_32"
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
@pytest.fixture(autouse=True)
def db_schema_32():
"""Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch.object(core, "StatesMeta", old_db_schema.StatesMeta), patch.object(
core, "EventTypes", old_db_schema.EventTypes
), patch.object(
core, "EventData", old_db_schema.EventData
), patch.object(
core, "States", old_db_schema.States
), patch.object(
core, "Events", old_db_schema.Events
), patch.object(
core, "StateAttributes", old_db_schema.StateAttributes
), patch.object(
core, "EntityIDMigrationTask", core.RecorderTask
), patch(
CREATE_ENGINE_TARGET, new=_create_engine_test
):
yield
@pytest.fixture(name="legacy_recorder_mock")
async def legacy_recorder_mock_fixture(recorder_mock):
"""Fixture for legacy recorder mock."""
with patch.object(recorder_mock.states_meta_manager, "active", False):
yield recorder_mock
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migrate_events_context_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex
uuid_bin = test_uuid.bytes
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
Events(
event_type="old_uuid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.452529,
context_id=uuid_hex,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="empty_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id=None,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="ulid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
context_id_bin=None,
context_user_id="9400facee45711eaa9308bfd3d19e474",
context_user_id_bin=None,
context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2",
context_parent_id_bin=None,
),
Events(
event_type="invalid_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="invalid",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
Events(
event_type="garbage_context_id_event",
event_data=None,
origin_idx=0,
time_fired=None,
time_fired_ts=1677721632.552529,
context_id="adapt_lgt:b'5Cf*':interval:b'0R'",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventsContextIDMigrationTask())
await async_recorder_block_till_done(hass)
def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def _fetch_migrated_events():
with session_scope(hass=hass) as session:
events = (
session.query(Events)
.filter(
Events.event_type.in_(
[
"old_uuid_context_id_event",
"empty_context_id_event",
"ulid_context_id_event",
"invalid_context_id_event",
"garbage_context_id_event",
]
)
)
.all()
)
assert len(events) == 5
return {event.event_type: _object_as_dict(event) for event in events}
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None
assert old_uuid_context_id_event["context_user_id"] is None
assert old_uuid_context_id_event["context_parent_id"] is None
assert old_uuid_context_id_event["context_id_bin"] == uuid_bin
assert old_uuid_context_id_event["context_user_id_bin"] is None
assert old_uuid_context_id_event["context_parent_id_bin"] is None
empty_context_id_event = events_by_type["empty_context_id_event"]
assert empty_context_id_event["context_id"] is None
assert empty_context_id_event["context_user_id"] is None
assert empty_context_id_event["context_parent_id"] is None
assert empty_context_id_event["context_id_bin"] == b"\x00" * 16
assert empty_context_id_event["context_user_id_bin"] is None
assert empty_context_id_event["context_parent_id_bin"] is None
ulid_context_id_event = events_by_type["ulid_context_id_event"]
assert ulid_context_id_event["context_id"] is None
assert ulid_context_id_event["context_user_id"] is None
assert ulid_context_id_event["context_parent_id"] is None
assert (
bytes_to_ulid(ulid_context_id_event["context_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FAV"
)
assert (
ulid_context_id_event["context_user_id_bin"]
== b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t"
)
assert (
bytes_to_ulid(ulid_context_id_event["context_parent_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FA2"
)
invalid_context_id_event = events_by_type["invalid_context_id_event"]
assert invalid_context_id_event["context_id"] is None
assert invalid_context_id_event["context_user_id"] is None
assert invalid_context_id_event["context_parent_id"] is None
assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16
assert invalid_context_id_event["context_user_id_bin"] is None
assert invalid_context_id_event["context_parent_id_bin"] is None
garbage_context_id_event = events_by_type["garbage_context_id_event"]
assert garbage_context_id_event["context_id"] is None
assert garbage_context_id_event["context_user_id"] is None
assert garbage_context_id_event["context_parent_id"] is None
assert garbage_context_id_event["context_id_bin"] == b"\x00" * 16
assert garbage_context_id_event["context_user_id_bin"] is None
assert garbage_context_id_event["context_parent_id_bin"] is None
@pytest.mark.parametrize("enable_migrate_context_ids", [True])
async def test_migrate_states_context_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex
uuid_bin = test_uuid.bytes
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="state.old_uuid_context_id",
last_updated_ts=1677721632.452529,
context_id=uuid_hex,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.empty_context_id",
last_updated_ts=1677721632.552529,
context_id=None,
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.ulid_context_id",
last_updated_ts=1677721632.552529,
context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
context_id_bin=None,
context_user_id="9400facee45711eaa9308bfd3d19e474",
context_user_id_bin=None,
context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2",
context_parent_id_bin=None,
),
States(
entity_id="state.invalid_context_id",
last_updated_ts=1677721632.552529,
context_id="invalid",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
States(
entity_id="state.garbage_context_id",
last_updated_ts=1677721632.552529,
context_id="adapt_lgt:b'5Cf*':interval:b'0R'",
context_id_bin=None,
context_user_id=None,
context_user_id_bin=None,
context_parent_id=None,
context_parent_id_bin=None,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(StatesContextIDMigrationTask())
await async_recorder_block_till_done(hass)
def _object_as_dict(obj):
return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def _fetch_migrated_states():
with session_scope(hass=hass) as session:
events = (
session.query(States)
.filter(
States.entity_id.in_(
[
"state.old_uuid_context_id",
"state.empty_context_id",
"state.ulid_context_id",
"state.invalid_context_id",
"state.garbage_context_id",
]
)
)
.all()
)
assert len(events) == 5
return {state.entity_id: _object_as_dict(state) for state in events}
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"]
assert old_uuid_context_id["context_id"] is None
assert old_uuid_context_id["context_user_id"] is None
assert old_uuid_context_id["context_parent_id"] is None
assert old_uuid_context_id["context_id_bin"] == uuid_bin
assert old_uuid_context_id["context_user_id_bin"] is None
assert old_uuid_context_id["context_parent_id_bin"] is None
empty_context_id = states_by_entity_id["state.empty_context_id"]
assert empty_context_id["context_id"] is None
assert empty_context_id["context_user_id"] is None
assert empty_context_id["context_parent_id"] is None
assert empty_context_id["context_id_bin"] == b"\x00" * 16
assert empty_context_id["context_user_id_bin"] is None
assert empty_context_id["context_parent_id_bin"] is None
ulid_context_id = states_by_entity_id["state.ulid_context_id"]
assert ulid_context_id["context_id"] is None
assert ulid_context_id["context_user_id"] is None
assert ulid_context_id["context_parent_id"] is None
assert (
bytes_to_ulid(ulid_context_id["context_id_bin"]) == "01ARZ3NDEKTSV4RRFFQ69G5FAV"
)
assert (
ulid_context_id["context_user_id_bin"]
== b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t"
)
assert (
bytes_to_ulid(ulid_context_id["context_parent_id_bin"])
== "01ARZ3NDEKTSV4RRFFQ69G5FA2"
)
invalid_context_id = states_by_entity_id["state.invalid_context_id"]
assert invalid_context_id["context_id"] is None
assert invalid_context_id["context_user_id"] is None
assert invalid_context_id["context_parent_id"] is None
assert invalid_context_id["context_id_bin"] == b"\x00" * 16
assert invalid_context_id["context_user_id_bin"] is None
assert invalid_context_id["context_parent_id_bin"] is None
garbage_context_id = states_by_entity_id["state.garbage_context_id"]
assert garbage_context_id["context_id"] is None
assert garbage_context_id["context_user_id"] is None
assert garbage_context_id["context_parent_id"] is None
assert garbage_context_id["context_id_bin"] == b"\x00" * 16
assert garbage_context_id["context_user_id_bin"] is None
assert garbage_context_id["context_parent_id_bin"] is None
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
async def test_migrate_event_type_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate event_types to the EventTypes table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1677721632.452529,
),
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1677721632.552529,
),
Events(
event_type="event_type_two",
origin_idx=0,
time_fired_ts=1677721632.552529,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventTypeIDMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_events():
with session_scope(hass=hass, read_only=True) as session:
events = (
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
.filter(
Events.event_type_id.in_(
select_event_type_ids(
(
"event_type_one",
"event_type_two",
)
)
)
)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.all()
)
assert len(events) == 3
result = {}
for event in events:
result.setdefault(event.event_type, []).append(
{
"event_id": event.event_id,
"time_fired": event.time_fired,
"event_type": event.event_type,
}
)
return result
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type["event_type_two"]) == 1
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_migrate_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
States(
entity_id="sensor.two",
state="two_2",
last_updated_ts=2.252529,
),
States(
entity_id="sensor.two",
state="two_1",
last_updated_ts=3.152529,
),
)
)
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
states = (
session.query(
States.state,
States.metadata_id,
States.last_updated_ts,
StatesMeta.entity_id,
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.all()
)
assert len(states) == 3
result = {}
for state in states:
result.setdefault(state.entity_id, []).append(
{
"state_id": state.entity_id,
"last_updated_ts": state.last_updated_ts,
"state": state.state,
}
)
return result
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
assert len(states_by_entity_id["sensor.two"]) == 2
assert len(states_by_entity_id["sensor.one"]) == 1
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_post_migrate_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add_all(
(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
States(
entity_id="sensor.two",
state="two_2",
last_updated_ts=2.252529,
),
States(
entity_id="sensor.two",
state="two_1",
last_updated_ts=3.152529,
),
)
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDPostMigrationTask())
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
states = session.query(
States.state,
States.entity_id,
).all()
assert len(states) == 3
return {state.state: state.entity_id for state in states}
states_by_state = await instance.async_add_executor_job(_fetch_migrated_states)
assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None
assert states_by_state["two_1"] is None
@pytest.mark.parametrize("enable_migrate_entity_ids", [True])
async def test_migrate_null_entity_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate entity_ids to the StatesMeta table."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_states():
with session_scope(hass=hass) as session:
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=1.452529,
),
)
session.add_all(
States(
entity_id=None,
state="empty",
last_updated_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
States(
entity_id="sensor.one",
state="one_1",
last_updated_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_states)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EntityIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_states():
with session_scope(hass=hass, read_only=True) as session:
states = (
session.query(
States.state,
States.metadata_id,
States.last_updated_ts,
StatesMeta.entity_id,
)
.outerjoin(StatesMeta, States.metadata_id == StatesMeta.metadata_id)
.all()
)
assert len(states) == 1002
result = {}
for state in states:
result.setdefault(state.entity_id, []).append(
{
"state_id": state.entity_id,
"last_updated_ts": state.last_updated_ts,
"state": state.state,
}
)
return result
states_by_entity_id = await instance.async_add_executor_job(_fetch_migrated_states)
assert len(states_by_entity_id[migration._EMPTY_ENTITY_ID]) == 1000
assert len(states_by_entity_id["sensor.one"]) == 2
@pytest.mark.parametrize("enable_migrate_event_type_ids", [True])
async def test_migrate_null_event_type_ids(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
def _insert_events():
with session_scope(hass=hass) as session:
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=1.452529,
),
)
session.add_all(
Events(
event_type=None,
origin_idx=0,
time_fired_ts=time + 1.452529,
)
for time in range(1000)
)
session.add(
Events(
event_type="event_type_one",
origin_idx=0,
time_fired_ts=2.452529,
),
)
await instance.async_add_executor_job(_insert_events)
await async_wait_recording_done(hass)
# This is a threadsafe way to add a task to the recorder
instance.queue_task(EventTypeIDMigrationTask())
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
def _fetch_migrated_events():
with session_scope(hass=hass, read_only=True) as session:
events = (
session.query(Events.event_id, Events.time_fired, EventTypes.event_type)
.filter(
Events.event_type_id.in_(
select_event_type_ids(
(
"event_type_one",
migration._EMPTY_EVENT_TYPE,
)
)
)
)
.outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id)
.all()
)
assert len(events) == 1002
result = {}
for event in events:
result.setdefault(event.event_type, []).append(
{
"event_id": event.event_id,
"time_fired": event.time_fired,
"event_type": event.event_type,
}
)
return result
events_by_type = await instance.async_add_executor_job(_fetch_migrated_events)
assert len(events_by_type["event_type_one"]) == 2
assert len(events_by_type[migration._EMPTY_EVENT_TYPE]) == 1000

View File

@ -10,13 +10,12 @@ from sqlalchemy.exc import DatabaseError, OperationalError
from sqlalchemy.orm.session import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder, migration
from homeassistant.components.recorder import purge, queries
from homeassistant.components.recorder.const import (
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from homeassistant.components.recorder.db_schema import (
EventData,
Events,
EventTypes,
RecorderRuns,
@ -37,18 +36,29 @@ from homeassistant.components.recorder.tasks import PurgeTask
from homeassistant.components.recorder.util import session_scope
from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.json import json_dumps
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import dt as dt_util
from homeassistant.util.json import json_loads
from .common import (
async_recorder_block_till_done,
async_wait_purge_done,
async_wait_recording_done,
convert_pending_events_to_event_types,
convert_pending_states_to_meta,
)
from tests.typing import RecorderInstanceGenerator
TEST_EVENT_TYPES = (
"EVENT_TEST_AUTOPURGE",
"EVENT_TEST_PURGE",
"EVENT_TEST",
"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA",
"EVENT_TEST_PURGE_WITH_EVENT_DATA",
"EVENT_TEST_WITH_EVENT_DATA",
)
@pytest.fixture(name="use_sqlite")
def mock_use_sqlite(request):
@ -253,7 +263,9 @@ async def test_purge_old_events(
await _add_test_events(hass)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 6
purge_before = dt_util.utcnow() - timedelta(days=4)
@ -267,7 +279,8 @@ async def test_purge_old_events(
states_batch_size=1,
)
assert not finished
assert events.count() == 2
all_events = events.all()
assert events.count() == 2, f"Should have 2 events left: {all_events}"
# we should only have 2 events left
finished = purge_old_data(
@ -377,7 +390,9 @@ async def test_purge_method(
states = session.query(States)
assert states.count() == 6
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 6
statistics = session.query(StatisticsShortTerm)
@ -408,7 +423,9 @@ async def test_purge_method(
with session_scope(hass=hass) as session:
states = session.query(States)
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
statistics = session.query(StatisticsShortTerm)
# only purged old states, events and statistics
@ -425,7 +442,9 @@ async def test_purge_method(
with session_scope(hass=hass) as session:
states = session.query(States)
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
statistics = session.query(StatisticsShortTerm)
recorder_runs = session.query(RecorderRuns)
statistics_runs = session.query(StatisticsRuns)
@ -497,6 +516,9 @@ async def test_purge_edge_case(
attributes_id=1002,
)
)
instance = recorder.get_instance(hass)
convert_pending_events_to_event_types(instance, session)
convert_pending_states_to_meta(instance, session)
await async_setup_recorder_instance(hass, None)
await async_wait_purge_done(hass)
@ -512,7 +534,9 @@ async def test_purge_edge_case(
state_attributes = session.query(StateAttributes)
assert state_attributes.count() == 1
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 1
await hass.services.async_call(recorder.DOMAIN, SERVICE_PURGE, service_data)
@ -524,7 +548,9 @@ async def test_purge_edge_case(
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 0
events = session.query(Events).filter(Events.event_type == "EVENT_TEST_PURGE")
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
assert events.count() == 0
@ -594,6 +620,8 @@ async def test_purge_cutoff_date(
attributes_id=1000 + row,
)
)
convert_pending_events_to_event_types(instance, session)
convert_pending_states_to_meta(instance, session)
instance = await async_setup_recorder_instance(hass, None)
await async_wait_purge_done(hass)
@ -608,7 +636,6 @@ async def test_purge_cutoff_date(
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
events = session.query(Events)
assert states.filter(States.state == "purge").count() == rows - 1
assert states.filter(States.state == "keep").count() == 1
assert (
@ -619,8 +646,18 @@ async def test_purge_cutoff_date(
.count()
== 1
)
assert events.filter(Events.event_type == "PURGE").count() == rows - 1
assert events.filter(Events.event_type == "KEEP").count() == 1
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
.count()
== rows - 1
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
.count()
== 1
)
instance.queue_task(PurgeTask(cutoff, repack=False, apply_filter=False))
await hass.async_block_till_done()
@ -630,7 +667,7 @@ async def test_purge_cutoff_date(
with session_scope(hass=hass) as session:
states = session.query(States)
state_attributes = session.query(StateAttributes)
events = session.query(Events)
session.query(Events)
assert states.filter(States.state == "purge").count() == 0
assert (
state_attributes.outerjoin(
@ -649,8 +686,18 @@ async def test_purge_cutoff_date(
.count()
== 1
)
assert events.filter(Events.event_type == "PURGE").count() == 0
assert events.filter(Events.event_type == "KEEP").count() == 1
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("PURGE",))))
.count()
== 0
)
assert (
session.query(Events)
.filter(Events.event_type_id.in_(select_event_type_ids(("KEEP",))))
.count()
== 1
)
# Make sure we can purge everything
instance.queue_task(PurgeTask(dt_util.utcnow(), repack=False, apply_filter=False))
@ -675,58 +722,6 @@ async def test_purge_cutoff_date(
assert state_attributes.count() == 0
def _convert_pending_states_to_meta(instance: Recorder, session: Session) -> None:
"""Convert pending states to use states_metadata."""
entity_ids: set[str] = set()
states: set[States] = set()
states_meta_objects: dict[str, StatesMeta] = {}
for object in session:
if isinstance(object, States):
entity_ids.add(object.entity_id)
states.add(object)
entity_id_to_metadata_ids = instance.states_meta_manager.get_many(
entity_ids, session, True
)
for state in states:
entity_id = state.entity_id
state.entity_id = None
if metadata_id := entity_id_to_metadata_ids.get(entity_id):
state.metadata_id = metadata_id
continue
if entity_id not in states_meta_objects:
states_meta_objects[entity_id] = StatesMeta(entity_id=entity_id)
state.states_meta_rel = states_meta_objects[entity_id]
def _convert_pending_events_to_event_types(
instance: Recorder, session: Session
) -> None:
"""Convert pending events to use event_type_ids."""
event_types: set[str] = set()
events: set[Events] = set()
event_types_objects: dict[str, EventTypes] = {}
for object in session:
if isinstance(object, Events):
event_types.add(object.event_type)
events.add(object)
event_type_to_event_type_ids = instance.event_type_manager.get_many(
event_types, session
)
for event in events:
event_type = event.event_type
event.event_type = None
if event_type_id := event_type_to_event_type_ids.get(event_type):
event.event_type_id = event_type_id
continue
if event_type not in event_types_objects:
event_types_objects[event_type] = EventTypes(event_type=event_type)
event.event_type_rel = event_types_objects[event_type]
@pytest.mark.parametrize("use_sqlite", (True, False), indirect=True)
async def test_purge_filtered_states(
async_setup_recorder_instance: RecorderInstanceGenerator,
@ -744,7 +739,7 @@ async def test_purge_filtered_states(
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
@ -765,7 +760,7 @@ async def test_purge_filtered_states(
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
@ -819,7 +814,8 @@ async def test_purge_filtered_states(
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
_convert_pending_states_to_meta(instance, session)
convert_pending_states_to_meta(instance, session)
convert_pending_events_to_event_types(instance, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
@ -827,12 +823,9 @@ async def test_purge_filtered_states(
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_state_changed = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
assert events_state_changed.count() == 70
assert events_keep.count() == 1
# Normal purge doesn't remove excluded entities
@ -845,11 +838,9 @@ async def test_purge_filtered_states(
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 74
events_state_changed = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_state_changed.count() == 70
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
assert events_keep.count() == 1
# Test with 'apply_filter' = True
@ -866,11 +857,9 @@ async def test_purge_filtered_states(
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 13
events_state_changed = session.query(Events).filter(
Events.event_type == EVENT_STATE_CHANGED
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_KEEP",)))
)
assert events_state_changed.count() == 10
events_keep = session.query(Events).filter(Events.event_type == "EVENT_KEEP")
assert events_keep.count() == 1
states_sensor_excluded = (
@ -945,14 +934,14 @@ async def test_purge_filtered_states_to_empty(
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
timestamp,
event_id * days,
)
_convert_pending_states_to_meta(instance, session)
convert_pending_states_to_meta(instance, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
@ -1028,7 +1017,8 @@ async def test_purge_without_state_attributes_filtered_states_to_empty(
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
_convert_pending_states_to_meta(instance, session)
convert_pending_states_to_meta(instance, session)
convert_pending_events_to_event_types(instance, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
@ -1085,14 +1075,15 @@ async def test_purge_filtered_events(
# Add states and state_changed events that should be keeped
timestamp = dt_util.utcnow() - timedelta(days=1)
for event_id in range(200, 210):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
_convert_pending_events_to_event_types(instance, session)
convert_pending_events_to_event_types(instance, session)
convert_pending_states_to_meta(instance, session)
service_data = {"keep_days": 10}
_add_db_entries(hass)
@ -1101,13 +1092,9 @@ async def test_purge_filtered_events(
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
)
states = session.query(States)
assert events_purge.count() == 60
assert events_keep.count() == 10
assert states.count() == 10
# Normal purge doesn't remove excluded events
@ -1121,12 +1108,8 @@ async def test_purge_filtered_events(
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
)
states = session.query(States)
assert events_purge.count() == 60
assert events_keep.count() == 10
assert states.count() == 10
# Test with 'apply_filter' = True
@ -1144,12 +1127,8 @@ async def test_purge_filtered_events(
events_purge = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(("EVENT_PURGE",)))
)
events_keep = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids((EVENT_STATE_CHANGED,)))
)
states = session.query(States)
assert events_purge.count() == 0
assert events_keep.count() == 10
assert states.count() == 10
@ -1177,7 +1156,7 @@ async def test_purge_filtered_events_state_changed(
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.excluded",
"purgeme",
@ -1242,8 +1221,8 @@ async def test_purge_filtered_events_state_changed(
last_updated_ts=dt_util.utc_to_timestamp(timestamp),
)
)
_convert_pending_events_to_event_types(instance, session)
_convert_pending_states_to_meta(instance, session)
convert_pending_events_to_event_types(instance, session)
convert_pending_states_to_meta(instance, session)
service_data = {"keep_days": 10, "apply_filter": True}
_add_db_entries(hass)
@ -1322,7 +1301,7 @@ async def test_purge_entities(
for days in range(1, 4):
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(1000, 1020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.purge_entity",
"purgeme",
@ -1331,7 +1310,7 @@ async def test_purge_entities(
)
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(10000, 10020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"purge_domain.entity",
"purgeme",
@ -1340,28 +1319,30 @@ async def test_purge_entities(
)
timestamp = dt_util.utcnow() - timedelta(days=days)
for event_id in range(100000, 100020):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"binary_sensor.purge_glob",
"purgeme",
timestamp,
event_id * days,
)
_convert_pending_states_to_meta(instance, session)
convert_pending_states_to_meta(instance, session)
convert_pending_events_to_event_types(instance, session)
def _add_keep_records(hass: HomeAssistant) -> None:
with session_scope(hass=hass) as session:
# Add states and state_changed events that should be kept
timestamp = dt_util.utcnow() - timedelta(days=2)
for event_id in range(200, 210):
_add_state_and_state_changed_event(
_add_state_with_state_attributes(
session,
"sensor.keep",
"keep",
timestamp,
event_id,
)
_convert_pending_states_to_meta(instance, session)
convert_pending_states_to_meta(instance, session)
convert_pending_events_to_event_types(instance, session)
_add_purge_records(hass)
_add_keep_records(hass)
@ -1425,7 +1406,7 @@ async def test_purge_entities(
await _purge_entities(hass, [], [], [])
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
states = session.query(States)
assert states.count() == 0
@ -1470,70 +1451,50 @@ async def _add_test_events(hass: HomeAssistant, iterations: int = 1):
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
for _ in range(iterations):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE"
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE"
else:
timestamp = utcnow
event_type = "EVENT_TEST"
with freeze_time(timestamp):
hass.bus.async_fire(event_type, event_data)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
for _ in range(iterations):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE"
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE"
else:
timestamp = utcnow
event_type = "EVENT_TEST"
session.add(
Events(
event_type=event_type,
event_data=json.dumps(event_data),
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
async def _add_events_with_event_data(hass: HomeAssistant, iterations: int = 1):
"""Add a few events with linked event_data for testing."""
utcnow = dt_util.utcnow()
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
await hass.async_block_till_done()
for _ in range(iterations):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"}'
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_PURGE_WITH_EVENT_DATA"}'
else:
timestamp = utcnow
event_type = "EVENT_TEST_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_WITH_EVENT_DATA"}'
with freeze_time(timestamp):
hass.bus.async_fire(event_type, json_loads(shared_data))
await async_wait_recording_done(hass)
with session_scope(hass=hass) as session:
for _ in range(iterations):
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_AUTOPURGE_WITH_EVENT_DATA"}'
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_PURGE_WITH_EVENT_DATA"}'
else:
timestamp = utcnow
event_type = "EVENT_TEST_WITH_EVENT_DATA"
shared_data = '{"type":{"EVENT_TEST_WITH_EVENT_DATA"}'
event_data = EventData(hash=1234, shared_data=shared_data)
session.add(
Events(
event_type=event_type,
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
event_data_rel=event_data,
)
)
async def _add_test_statistics(hass: HomeAssistant):
"""Add multiple statistics to the db for testing."""
@ -1639,7 +1600,7 @@ def _add_state_without_event_linkage(
)
def _add_state_and_state_changed_event(
def _add_state_with_state_attributes(
session: Session,
entity_id: str,
state: str,
@ -1662,168 +1623,60 @@ def _add_state_and_state_changed_event(
state_attributes=state_attrs,
)
)
session.add(
Events(
event_id=event_id,
event_type=EVENT_STATE_CHANGED,
event_data="{}",
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)
async def test_purge_many_old_events(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test deleting old events."""
instance = await async_setup_recorder_instance(hass)
old_events_count = 5
with patch.object(queries, "SQLITE_MAX_BIND_VARS", old_events_count), patch.object(
purge, "SQLITE_MAX_BIND_VARS", old_events_count
):
instance = await async_setup_recorder_instance(hass)
await _add_test_events(hass, SQLITE_MAX_BIND_VARS)
await _add_test_events(hass, old_events_count)
with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == SQLITE_MAX_BIND_VARS * 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data()
finished = purge_old_data(
instance,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
assert not finished
assert events.count() == SQLITE_MAX_BIND_VARS * 3
# we should only have 2 groups of events left
finished = purge_old_data(
instance,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
assert finished
assert events.count() == SQLITE_MAX_BIND_VARS * 2
# we should now purge everything
finished = purge_old_data(
instance,
dt_util.utcnow(),
repack=False,
states_batch_size=20,
events_batch_size=20,
)
assert finished
assert events.count() == 0
async def test_purge_can_mix_legacy_and_new_format(
async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant
) -> None:
"""Test purging with legacy a new events."""
instance = await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
# New databases are no longer created with the legacy events index
assert instance.use_legacy_events_index is False
def _recreate_legacy_events_index():
"""Recreate the legacy events index since its no longer created on new instances."""
migration._create_index(instance.get_session, "states", "ix_states_event_id")
instance.use_legacy_events_index = True
await instance.async_add_executor_job(_recreate_legacy_events_index)
assert instance.use_legacy_events_index is True
utcnow = dt_util.utcnow()
eleven_days_ago = utcnow - timedelta(days=11)
with session_scope(hass=hass) as session:
broken_state_no_time = States(
event_id=None,
entity_id="orphened.state",
last_updated_ts=None,
last_changed_ts=None,
)
session.add(broken_state_no_time)
start_id = 50000
for event_id in range(start_id, start_id + 50):
_add_state_and_state_changed_event(
session,
"sensor.excluded",
"purgeme",
eleven_days_ago,
event_id,
with session_scope(hass=hass) as session:
events = session.query(Events).filter(
Events.event_type_id.in_(select_event_type_ids(TEST_EVENT_TYPES))
)
await _add_test_events(hass, 50)
await _add_events_with_event_data(hass, 50)
with session_scope(hass=hass) as session:
for _ in range(50):
_add_state_without_event_linkage(
session, "switch.random", "on", eleven_days_ago
assert events.count() == old_events_count * 6
purge_before = dt_util.utcnow() - timedelta(days=4)
# run purge_old_data()
finished = purge_old_data(
instance,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
states_with_event_id = session.query(States).filter(
States.event_id.is_not(None)
)
states_without_event_id = session.query(States).filter(
States.event_id.is_(None)
)
assert not finished
assert events.count() == old_events_count * 3
assert states_with_event_id.count() == 50
assert states_without_event_id.count() == 51
# we should only have 2 groups of events left
finished = purge_old_data(
instance,
purge_before,
repack=False,
states_batch_size=3,
events_batch_size=3,
)
assert finished
assert events.count() == old_events_count * 2
purge_before = dt_util.utcnow() - timedelta(days=4)
finished = purge_old_data(
instance,
purge_before,
repack=False,
)
assert not finished
assert states_with_event_id.count() == 0
assert states_without_event_id.count() == 51
# At this point all the legacy states are gone
# and we switch methods
purge_before = dt_util.utcnow() - timedelta(days=4)
finished = purge_old_data(
instance,
purge_before,
repack=False,
events_batch_size=1,
states_batch_size=1,
)
# Since we only allow one iteration, we won't
# check if we are finished this loop similar
# to the legacy method
assert not finished
assert states_with_event_id.count() == 0
assert states_without_event_id.count() == 1
finished = purge_old_data(
instance,
purge_before,
repack=False,
events_batch_size=100,
states_batch_size=100,
)
assert finished
assert states_with_event_id.count() == 0
assert states_without_event_id.count() == 1
_add_state_without_event_linkage(
session, "switch.random", "on", eleven_days_ago
)
assert states_with_event_id.count() == 0
assert states_without_event_id.count() == 2
finished = purge_old_data(
instance,
purge_before,
repack=False,
)
assert finished
# The broken state without a timestamp
# does not prevent future purges. Its ignored.
assert states_with_event_id.count() == 0
assert states_without_event_id.count() == 1
# we should now purge everything
finished = purge_old_data(
instance,
dt_util.utcnow(),
repack=False,
states_batch_size=20,
events_batch_size=20,
)
assert finished
assert events.count() == 0
async def test_purge_old_events_purges_the_event_type_ids(
@ -1837,7 +1690,6 @@ async def test_purge_old_events_purges_the_event_type_ids(
five_days_ago = utcnow - timedelta(days=5)
eleven_days_ago = utcnow - timedelta(days=11)
far_past = utcnow - timedelta(days=1000)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
await hass.async_block_till_done()
await async_wait_recording_done(hass)
@ -1873,8 +1725,6 @@ async def test_purge_old_events_purges_the_event_type_ids(
Events(
event_type=None,
event_type_id=event_type.event_type_id,
event_data=json_dumps(event_data),
origin="LOCAL",
time_fired_ts=dt_util.utc_to_timestamp(timestamp),
)
)

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,8 @@
The v23 schema used for these tests has been slightly modified to add the
EventData table to allow the recorder to startup successfully.
"""
from functools import partial
# pylint: disable=invalid-name
import importlib
import json
@ -11,46 +13,27 @@ from unittest.mock import patch
import py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import SQLITE_URL_PREFIX, statistics
from homeassistant.components.recorder import SQLITE_URL_PREFIX
from homeassistant.components.recorder.util import session_scope
from homeassistant.helpers import recorder as recorder_helper
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from .common import wait_recording_done
from .common import (
CREATE_ENGINE_TARGET,
create_engine_test_for_schema_version_postfix,
get_schema_module_path,
wait_recording_done,
)
from tests.common import get_test_home_assistant
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_23_with_newer_columns"
def _create_engine_test(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
importlib.import_module(SCHEMA_MODULE)
old_db_schema = sys.modules[SCHEMA_MODULE]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
SCHEMA_VERSION_POSTFIX = "23_with_newer_columns"
SCHEMA_MODULE = get_schema_module_path(SCHEMA_VERSION_POSTFIX)
def test_delete_duplicates(
@ -182,7 +165,13 @@ def test_delete_duplicates(
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
), patch(
CREATE_ENGINE_TARGET,
new=partial(
create_engine_test_for_schema_version_postfix,
schema_version_postfix=SCHEMA_VERSION_POSTFIX,
),
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -354,7 +343,13 @@ def test_delete_duplicates_many(
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
), patch(
CREATE_ENGINE_TARGET,
new=partial(
create_engine_test_for_schema_version_postfix,
schema_version_postfix=SCHEMA_VERSION_POSTFIX,
),
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -503,7 +498,13 @@ def test_delete_duplicates_non_identical(
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
), patch(
CREATE_ENGINE_TARGET,
new=partial(
create_engine_test_for_schema_version_postfix,
schema_version_postfix=SCHEMA_VERSION_POSTFIX,
),
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
@ -607,7 +608,13 @@ def test_delete_duplicates_short_term(
# Create some duplicated statistics with schema version 23
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(CREATE_ENGINE_TARGET, new=_create_engine_test):
), patch(
CREATE_ENGINE_TARGET,
new=partial(
create_engine_test_for_schema_version_postfix,
schema_version_postfix=SCHEMA_VERSION_POSTFIX,
),
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})

View File

@ -4657,7 +4657,7 @@ async def test_validate_statistics_unit_change_no_conversion(
assert response["result"] == expected_result
async def assert_statistic_ids(expected_result):
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(StatisticsMeta))
assert len(db_states) == len(expected_result)
for i in range(len(db_states)):
@ -4792,7 +4792,7 @@ async def test_validate_statistics_unit_change_equivalent_units(
assert response["result"] == expected_result
async def assert_statistic_ids(expected_result):
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(StatisticsMeta))
assert len(db_states) == len(expected_result)
for i in range(len(db_states)):
@ -4878,7 +4878,7 @@ async def test_validate_statistics_unit_change_equivalent_units_2(
assert response["result"] == expected_result
async def assert_statistic_ids(expected_result):
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
db_states = list(session.query(StatisticsMeta))
assert len(db_states) == len(expected_result)
for i in range(len(db_states)):
@ -5143,7 +5143,7 @@ async def test_exclude_attributes(
await async_wait_recording_done(hass)
def _fetch_states() -> list[State]:
with session_scope(hass=hass) as session:
with session_scope(hass=hass, read_only=True) as session:
native_states = []
for db_state, db_state_attributes, db_states_meta in (
session.query(States, StateAttributes, StatesMeta)