diff --git a/homeassistant/components/logbook/models.py b/homeassistant/components/logbook/models.py index 3fc4b5dac8b..a5ce9eddcec 100644 --- a/homeassistant/components/logbook/models.py +++ b/homeassistant/components/logbook/models.py @@ -2,14 +2,21 @@ from __future__ import annotations from dataclasses import dataclass -import json from typing import Any, cast from sqlalchemy.engine.row import Row +from homeassistant.components.recorder.models import ( + bytes_to_ulid_or_none, + bytes_to_uuid_hex_or_none, + ulid_to_bytes_or_none, + uuid_hex_to_bytes_or_none, +) from homeassistant.const import ATTR_ICON, EVENT_STATE_CHANGED from homeassistant.core import Context, Event, State, callback import homeassistant.util.dt as dt_util +from homeassistant.util.json import json_loads +from homeassistant.util.ulid import ulid_to_bytes class LazyEventPartialState: @@ -22,9 +29,9 @@ class LazyEventPartialState: "event_type", "entity_id", "state", - "context_id", - "context_user_id", - "context_parent_id", + "context_id_bin", + "context_user_id_bin", + "context_parent_id_bin", "data", ] @@ -40,9 +47,9 @@ class LazyEventPartialState: self.event_type: str | None = self.row.event_type self.entity_id: str | None = self.row.entity_id self.state = self.row.state - self.context_id: str | None = self.row.context_id - self.context_user_id: str | None = self.row.context_user_id - self.context_parent_id: str | None = self.row.context_parent_id + self.context_id_bin: bytes | None = self.row.context_id_bin + self.context_user_id_bin: bytes | None = self.row.context_user_id_bin + self.context_parent_id_bin: bytes | None = self.row.context_parent_id_bin if data := getattr(row, "data", None): # If its an EventAsRow we can avoid the whole # json decode process as we already have the data @@ -55,9 +62,24 @@ class LazyEventPartialState: self.data = event_data else: self.data = self._event_data_cache[source] = cast( - dict[str, Any], json.loads(source) + dict[str, Any], json_loads(source) ) + @property + def context_id(self) -> str | None: + """Return the context id.""" + return bytes_to_ulid_or_none(self.context_id_bin) + + @property + def context_user_id(self) -> str | None: + """Return the context user id.""" + return bytes_to_uuid_hex_or_none(self.context_user_id_bin) + + @property + def context_parent_id(self) -> str | None: + """Return the context parent id.""" + return bytes_to_ulid_or_none(self.context_parent_id_bin) + @dataclass(frozen=True) class EventAsRow: @@ -65,7 +87,7 @@ class EventAsRow: data: dict[str, Any] context: Context - context_id: str + context_id_bin: bytes time_fired_ts: float state_id: int event_data: str | None = None @@ -73,8 +95,8 @@ class EventAsRow: event_id: None = None entity_id: str | None = None icon: str | None = None - context_user_id: str | None = None - context_parent_id: str | None = None + context_user_id_bin: bytes | None = None + context_parent_id_bin: bytes | None = None event_type: str | None = None state: str | None = None shared_data: str | None = None @@ -85,13 +107,14 @@ class EventAsRow: def async_event_to_row(event: Event) -> EventAsRow: """Convert an event to a row.""" if event.event_type != EVENT_STATE_CHANGED: + context = event.context return EventAsRow( data=event.data, context=event.context, event_type=event.event_type, - context_id=event.context.id, - context_user_id=event.context.user_id, - context_parent_id=event.context.parent_id, + context_id_bin=ulid_to_bytes(context.id), + context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id), + context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id), time_fired_ts=dt_util.utc_to_timestamp(event.time_fired), state_id=hash(event), ) @@ -99,14 +122,15 @@ def async_event_to_row(event: Event) -> EventAsRow: # that are missing new_state or old_state # since the logbook does not show these new_state: State = event.data["new_state"] + context = new_state.context return EventAsRow( data=event.data, context=event.context, entity_id=new_state.entity_id, state=new_state.state, - context_id=new_state.context.id, - context_user_id=new_state.context.user_id, - context_parent_id=new_state.context.parent_id, + context_id_bin=ulid_to_bytes(context.id), + context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id), + context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id), time_fired_ts=dt_util.utc_to_timestamp(new_state.last_updated), state_id=hash(event), icon=new_state.attributes.get(ATTR_ICON), diff --git a/homeassistant/components/logbook/processor.py b/homeassistant/components/logbook/processor.py index 289ee677a21..39d8e920aa4 100644 --- a/homeassistant/components/logbook/processor.py +++ b/homeassistant/components/logbook/processor.py @@ -12,6 +12,7 @@ from sqlalchemy.engine.row import Row from homeassistant.components.recorder.filters import Filters from homeassistant.components.recorder.models import ( + bytes_to_uuid_hex_or_none, process_datetime_to_timestamp, process_timestamp_to_utc_isoformat, ) @@ -261,14 +262,14 @@ class ContextLookup: """Memorize context origin.""" self.hass = hass self._memorize_new = True - self._lookup: dict[str | None, Row | EventAsRow | None] = {None: None} + self._lookup: dict[bytes | None, Row | EventAsRow | None] = {None: None} - def memorize(self, row: Row | EventAsRow) -> str | None: + def memorize(self, row: Row | EventAsRow) -> bytes | None: """Memorize a context from the database.""" if self._memorize_new: - context_id: str = row.context_id - self._lookup.setdefault(context_id, row) - return context_id + context_id_bin: bytes = row.context_id_bin + self._lookup.setdefault(context_id_bin, row) + return context_id_bin return None def clear(self) -> None: @@ -276,9 +277,9 @@ class ContextLookup: self._lookup.clear() self._memorize_new = False - def get(self, context_id: str) -> Row | EventAsRow | None: + def get(self, context_id_bin: bytes) -> Row | EventAsRow | None: """Get the context origin.""" - return self._lookup.get(context_id) + return self._lookup.get(context_id_bin) class ContextAugmenter: @@ -293,7 +294,7 @@ class ContextAugmenter: self.include_entity_name = logbook_run.include_entity_name def _get_context_row( - self, context_id: str | None, row: Row | EventAsRow + self, context_id: bytes | None, row: Row | EventAsRow ) -> Row | EventAsRow | None: """Get the context row from the id or row context.""" if context_id: @@ -305,11 +306,11 @@ class ContextAugmenter: return None def augment( - self, data: dict[str, Any], row: Row | EventAsRow, context_id: str | None + self, data: dict[str, Any], row: Row | EventAsRow, context_id: bytes | None ) -> None: """Augment data from the row and cache.""" - if context_user_id := row.context_user_id: - data[CONTEXT_USER_ID] = context_user_id + if context_user_id_bin := row.context_user_id_bin: + data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(context_user_id_bin) if not (context_row := self._get_context_row(context_id, row)): return @@ -317,11 +318,12 @@ class ContextAugmenter: if _rows_match(row, context_row): # This is the first event with the given ID. Was it directly caused by # a parent event? + context_parent_id_bin = row.context_parent_id_bin if ( - not row.context_parent_id + not context_parent_id_bin or ( context_row := self._get_context_row( - row.context_parent_id, context_row + context_parent_id_bin, context_row ) ) is None diff --git a/homeassistant/components/logbook/queries/__init__.py b/homeassistant/components/logbook/queries/__init__.py index 8a2ee40de4f..b88fd4842cd 100644 --- a/homeassistant/components/logbook/queries/__init__.py +++ b/homeassistant/components/logbook/queries/__init__.py @@ -6,6 +6,7 @@ from datetime import datetime as dt from sqlalchemy.sql.lambdas import StatementLambdaElement from homeassistant.components.recorder.filters import Filters +from homeassistant.components.recorder.models import ulid_to_bytes_or_none from homeassistant.helpers.json import json_dumps from homeassistant.util import dt as dt_util @@ -27,6 +28,7 @@ def statement_for_request( """Generate the logbook statement for a logbook request.""" start_day = dt_util.utc_to_timestamp(start_day_dt) end_day = dt_util.utc_to_timestamp(end_day_dt) + context_id_bin = ulid_to_bytes_or_none(context_id) # No entities: logbook sends everything for the timeframe # limited by the context_id and the yaml configured filter if not entity_ids and not device_ids: @@ -38,7 +40,7 @@ def statement_for_request( event_types, states_entity_filter, events_entity_filter, - context_id, + context_id_bin, ) # sqlalchemy caches object quoting, the diff --git a/homeassistant/components/logbook/queries/all.py b/homeassistant/components/logbook/queries/all.py index 729a4d2195a..5311d5a9d6e 100644 --- a/homeassistant/components/logbook/queries/all.py +++ b/homeassistant/components/logbook/queries/all.py @@ -26,28 +26,28 @@ def all_stmt( event_types: tuple[str, ...], states_entity_filter: ColumnElement | None = None, events_entity_filter: ColumnElement | None = None, - context_id: str | None = None, + context_id_bin: bytes | None = None, ) -> StatementLambdaElement: """Generate a logbook query for all entities.""" stmt = lambda_stmt( lambda: select_events_without_states(start_day, end_day, event_types) ) - if context_id is not None: + if context_id_bin is not None: # Once all the old `state_changed` events # are gone from the database remove the # _legacy_select_events_context_id() - stmt += lambda s: s.where(Events.context_id == context_id).union_all( + stmt += lambda s: s.where(Events.context_id_bin == context_id_bin).union_all( _states_query_for_context_id( start_day, end_day, # https://github.com/python/mypy/issues/2608 - context_id, # type:ignore[arg-type] + context_id_bin, # type:ignore[arg-type] ), legacy_select_events_context_id( start_day, end_day, # https://github.com/python/mypy/issues/2608 - context_id, # type:ignore[arg-type] + context_id_bin, # type:ignore[arg-type] ), ) else: @@ -76,12 +76,14 @@ def _apply_all_hints(sel: Select) -> Select: """Force mysql to use the right index on large selects.""" return sel.with_hint( States, f"FORCE INDEX ({LAST_UPDATED_INDEX_TS})", dialect_name="mysql" + ).with_hint( + States, f"FORCE INDEX ({LAST_UPDATED_INDEX_TS})", dialect_name="mariadb" ) def _states_query_for_context_id( - start_day: float, end_day: float, context_id: str + start_day: float, end_day: float, context_id_bin: bytes ) -> Select: return apply_states_filters(select_states(), start_day, end_day).where( - States.context_id == context_id + States.context_id_bin == context_id_bin ) diff --git a/homeassistant/components/logbook/queries/common.py b/homeassistant/components/logbook/queries/common.py index ca00f31615a..a0c8ddbdda2 100644 --- a/homeassistant/components/logbook/queries/common.py +++ b/homeassistant/components/logbook/queries/common.py @@ -10,11 +10,11 @@ from sqlalchemy.sql.expression import literal from sqlalchemy.sql.selectable import Select from homeassistant.components.recorder.db_schema import ( - EVENTS_CONTEXT_ID_INDEX, + EVENTS_CONTEXT_ID_BIN_INDEX, OLD_FORMAT_ATTRS_JSON, OLD_STATE, SHARED_ATTRS_JSON, - STATES_CONTEXT_ID_INDEX, + STATES_CONTEXT_ID_BIN_INDEX, EventData, Events, StateAttributes, @@ -47,9 +47,9 @@ EVENT_COLUMNS = ( Events.event_type.label("event_type"), Events.event_data.label("event_data"), Events.time_fired_ts.label("time_fired_ts"), - Events.context_id.label("context_id"), - Events.context_user_id.label("context_user_id"), - Events.context_parent_id.label("context_parent_id"), + Events.context_id_bin.label("context_id_bin"), + Events.context_user_id_bin.label("context_user_id_bin"), + Events.context_parent_id_bin.label("context_parent_id_bin"), ) STATE_COLUMNS = ( @@ -79,9 +79,9 @@ EVENT_COLUMNS_FOR_STATE_SELECT = ( ), literal(value=None, type_=sqlalchemy.Text).label("event_data"), States.last_updated_ts.label("time_fired_ts"), - States.context_id.label("context_id"), - States.context_user_id.label("context_user_id"), - States.context_parent_id.label("context_parent_id"), + States.context_id_bin.label("context_id_bin"), + States.context_user_id_bin.label("context_user_id_bin"), + States.context_parent_id_bin.label("context_parent_id_bin"), literal(value=None, type_=sqlalchemy.Text).label("shared_data"), ) @@ -113,7 +113,7 @@ def select_events_context_id_subquery( ) -> Select: """Generate the select for a context_id subquery.""" return ( - select(Events.context_id) + select(Events.context_id_bin) .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day)) .where(Events.event_type.in_(event_types)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) @@ -162,7 +162,7 @@ def select_states() -> Select: def legacy_select_events_context_id( - start_day: float, end_day: float, context_id: str + start_day: float, end_day: float, context_id_bin: bytes ) -> Select: """Generate a legacy events context id select that also joins states.""" # This can be removed once we no longer have event_ids in the states table @@ -183,7 +183,7 @@ def legacy_select_events_context_id( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day)) - .where(Events.context_id == context_id) + .where(Events.context_id_bin == context_id_bin) ) @@ -277,12 +277,16 @@ def _not_uom_attributes_matcher() -> BooleanClauseList: def apply_states_context_hints(sel: Select) -> Select: """Force mysql to use the right index on large context_id selects.""" return sel.with_hint( - States, f"FORCE INDEX ({STATES_CONTEXT_ID_INDEX})", dialect_name="mysql" + States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql" + ).with_hint( + States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb" ) def apply_events_context_hints(sel: Select) -> Select: """Force mysql to use the right index on large context_id selects.""" return sel.with_hint( - Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_INDEX})", dialect_name="mysql" + Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql" + ).with_hint( + Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb" ) diff --git a/homeassistant/components/logbook/queries/devices.py b/homeassistant/components/logbook/queries/devices.py index fa2deaf4c02..303313602df 100644 --- a/homeassistant/components/logbook/queries/devices.py +++ b/homeassistant/components/logbook/queries/devices.py @@ -36,7 +36,7 @@ def _select_device_id_context_ids_sub_query( inner = select_events_context_id_subquery(start_day, end_day, event_types).where( apply_event_device_id_matchers(json_quotable_device_ids) ) - return select(inner.c.context_id).group_by(inner.c.context_id) + return select(inner.c.context_id_bin).group_by(inner.c.context_id_bin) def _apply_devices_context_union( @@ -57,12 +57,12 @@ def _apply_devices_context_union( apply_events_context_hints( select_events_context_only() .select_from(devices_cte) - .outerjoin(Events, devices_cte.c.context_id == Events.context_id) + .outerjoin(Events, devices_cte.c.context_id_bin == Events.context_id_bin) ).outerjoin(EventData, (Events.data_id == EventData.data_id)), apply_states_context_hints( select_states_context_only() .select_from(devices_cte) - .outerjoin(States, devices_cte.c.context_id == States.context_id) + .outerjoin(States, devices_cte.c.context_id_bin == States.context_id_bin) ), ) diff --git a/homeassistant/components/logbook/queries/entities.py b/homeassistant/components/logbook/queries/entities.py index 3d26443ce90..2c095d1b051 100644 --- a/homeassistant/components/logbook/queries/entities.py +++ b/homeassistant/components/logbook/queries/entities.py @@ -42,13 +42,13 @@ def _select_entities_context_ids_sub_query( select_events_context_id_subquery(start_day, end_day, event_types).where( apply_event_entity_id_matchers(json_quoted_entity_ids) ), - apply_entities_hints(select(States.context_id)) + apply_entities_hints(select(States.context_id_bin)) .filter( (States.last_updated_ts > start_day) & (States.last_updated_ts < end_day) ) .where(States.entity_id.in_(entity_ids)), ) - return select(union.c.context_id).group_by(union.c.context_id) + return select(union.c.context_id_bin).group_by(union.c.context_id_bin) def _apply_entities_context_union( @@ -77,12 +77,12 @@ def _apply_entities_context_union( apply_events_context_hints( select_events_context_only() .select_from(entities_cte) - .outerjoin(Events, entities_cte.c.context_id == Events.context_id) + .outerjoin(Events, entities_cte.c.context_id_bin == Events.context_id_bin) ).outerjoin(EventData, (Events.data_id == EventData.data_id)), apply_states_context_hints( select_states_context_only() .select_from(entities_cte) - .outerjoin(States, entities_cte.c.context_id == States.context_id) + .outerjoin(States, entities_cte.c.context_id_bin == States.context_id_bin) ), ) @@ -138,4 +138,8 @@ def apply_entities_hints(sel: Select) -> Select: """Force mysql to use the right index on large selects.""" return sel.with_hint( States, f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX_TS})", dialect_name="mysql" + ).with_hint( + States, + f"FORCE INDEX ({ENTITY_ID_LAST_UPDATED_INDEX_TS})", + dialect_name="mariadb", ) diff --git a/homeassistant/components/logbook/queries/entities_and_devices.py b/homeassistant/components/logbook/queries/entities_and_devices.py index 43d11d0bdff..ec38dc7b6d8 100644 --- a/homeassistant/components/logbook/queries/entities_and_devices.py +++ b/homeassistant/components/logbook/queries/entities_and_devices.py @@ -41,13 +41,13 @@ def _select_entities_device_id_context_ids_sub_query( json_quoted_entity_ids, json_quoted_device_ids ) ), - apply_entities_hints(select(States.context_id)) + apply_entities_hints(select(States.context_id_bin)) .filter( (States.last_updated_ts > start_day) & (States.last_updated_ts < end_day) ) .where(States.entity_id.in_(entity_ids)), ) - return select(union.c.context_id).group_by(union.c.context_id) + return select(union.c.context_id_bin).group_by(union.c.context_id_bin) def _apply_entities_devices_context_union( @@ -77,12 +77,16 @@ def _apply_entities_devices_context_union( apply_events_context_hints( select_events_context_only() .select_from(devices_entities_cte) - .outerjoin(Events, devices_entities_cte.c.context_id == Events.context_id) + .outerjoin( + Events, devices_entities_cte.c.context_id_bin == Events.context_id_bin + ) ).outerjoin(EventData, (Events.data_id == EventData.data_id)), apply_states_context_hints( select_states_context_only() .select_from(devices_entities_cte) - .outerjoin(States, devices_entities_cte.c.context_id == States.context_id) + .outerjoin( + States, devices_entities_cte.c.context_id_bin == States.context_id_bin + ) ), ) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 4e099a3b17f..7e3f08d7abd 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -89,6 +89,7 @@ from .tasks import ( ChangeStatisticsUnitTask, ClearStatisticsTask, CommitTask, + ContextIDMigrationTask, DatabaseLockTask, EventTask, ImportStatisticsTask, @@ -687,6 +688,7 @@ class Recorder(threading.Thread): _LOGGER.debug("Recorder processing the queue") self._adjust_lru_size() self.hass.add_job(self._async_set_recorder_ready_migration_done) + self.queue_task(ContextIDMigrationTask()) self._run_event_loop() self._shutdown() @@ -1146,6 +1148,10 @@ class Recorder(threading.Thread): """Run post schema migration tasks.""" migration.post_schema_migration(self, old_version, new_version) + def _migrate_context_ids(self) -> bool: + """Migrate context ids if needed.""" + return migration.migrate_context_ids(self) + def _send_keep_alive(self) -> None: """Send a keep alive to keep the db connection open.""" assert self.event_session is not None diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 9a059c570c6..f794c64714e 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -21,6 +21,7 @@ from sqlalchemy import ( Identity, Index, Integer, + LargeBinary, SmallInteger, String, Text, @@ -55,8 +56,12 @@ from .models import ( StatisticData, StatisticDataTimestamp, StatisticMetaData, + bytes_to_ulid_or_none, + bytes_to_uuid_hex_or_none, datetime_to_timestamp_or_none, process_timestamp, + ulid_to_bytes_or_none, + uuid_hex_to_bytes_or_none, ) @@ -66,7 +71,7 @@ class Base(DeclarativeBase): """Base class for tables.""" -SCHEMA_VERSION = 35 +SCHEMA_VERSION = 36 _LOGGER = logging.getLogger(__name__) @@ -108,8 +113,9 @@ TABLES_TO_CHECK = [ LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts" ENTITY_ID_LAST_UPDATED_INDEX_TS = "ix_states_entity_id_last_updated_ts" -EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id" -STATES_CONTEXT_ID_INDEX = "ix_states_context_id" +EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" +STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" +CONTEXT_ID_BIN_MAX_LENGTH = 16 _DEFAULT_TABLE_ARGS = { "mysql_default_charset": "utf8mb4", @@ -174,6 +180,12 @@ class Events(Base): # Used for fetching events at a specific time # see logbook Index("ix_events_event_type_time_fired_ts", "event_type", "time_fired_ts"), + Index( + EVENTS_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), _DEFAULT_TABLE_ARGS, ) __tablename__ = TABLE_EVENTS @@ -190,18 +202,27 @@ class Events(Base): DATETIME_TYPE ) # no longer used for new rows time_fired_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True) - context_id: Mapped[str | None] = mapped_column( + context_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True ) - context_user_id: Mapped[str | None] = mapped_column( + context_user_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID) ) - context_parent_id: Mapped[str | None] = mapped_column( + context_parent_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID) ) data_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("event_data.data_id"), index=True ) + context_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), + ) + context_user_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), + ) + context_parent_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) event_data_rel: Mapped[EventData | None] = relationship("EventData") def __repr__(self) -> str: @@ -234,17 +255,20 @@ class Events(Base): origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin), time_fired=None, time_fired_ts=dt_util.utc_to_timestamp(event.time_fired), - context_id=event.context.id, - context_user_id=event.context.user_id, - context_parent_id=event.context.parent_id, + context_id=None, + context_id_bin=ulid_to_bytes_or_none(event.context.id), + context_user_id=None, + context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id), + context_parent_id=None, + context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id), ) def to_native(self, validate_entity_id: bool = True) -> Event | None: """Convert to a native HA Event.""" context = Context( - id=self.context_id, - user_id=self.context_user_id, - parent_id=self.context_parent_id, + id=bytes_to_ulid_or_none(self.context_id_bin), + user_id=bytes_to_uuid_hex_or_none(self.context_user_id), + parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin), ) try: return Event( @@ -316,6 +340,12 @@ class States(Base): # Used for fetching the state of entities at a specific time # (get_states in history.py) Index(ENTITY_ID_LAST_UPDATED_INDEX_TS, "entity_id", "last_updated_ts"), + Index( + STATES_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), _DEFAULT_TABLE_ARGS, ) __tablename__ = TABLE_STATES @@ -344,13 +374,13 @@ class States(Base): attributes_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("state_attributes.attributes_id"), index=True ) - context_id: Mapped[str | None] = mapped_column( + context_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True ) - context_user_id: Mapped[str | None] = mapped_column( + context_user_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID) ) - context_parent_id: Mapped[str | None] = mapped_column( + context_parent_id: Mapped[str | None] = mapped_column( # no longer used String(MAX_LENGTH_EVENT_CONTEXT_ID) ) origin_idx: Mapped[int | None] = mapped_column( @@ -358,6 +388,15 @@ class States(Base): ) # 0 is local, 1 is remote old_state: Mapped[States | None] = relationship("States", remote_side=[state_id]) state_attributes: Mapped[StateAttributes | None] = relationship("StateAttributes") + context_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), + ) + context_user_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), + ) + context_parent_id_bin: Mapped[bytes | None] = mapped_column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) def __repr__(self) -> str: """Return string representation of instance for debugging.""" @@ -388,9 +427,12 @@ class States(Base): dbstate = States( entity_id=entity_id, attributes=None, - context_id=event.context.id, - context_user_id=event.context.user_id, - context_parent_id=event.context.parent_id, + context_id=None, + context_id_bin=ulid_to_bytes_or_none(event.context.id), + context_user_id=None, + context_user_id_bin=uuid_hex_to_bytes_or_none(event.context.user_id), + context_parent_id=None, + context_parent_id_bin=ulid_to_bytes_or_none(event.context.parent_id), origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin), last_updated=None, last_changed=None, @@ -414,9 +456,9 @@ class States(Base): def to_native(self, validate_entity_id: bool = True) -> State | None: """Convert to an HA state object.""" context = Context( - id=self.context_id, - user_id=self.context_user_id, - parent_id=self.context_parent_id, + id=bytes_to_ulid_or_none(self.context_id_bin), + user_id=bytes_to_uuid_hex_or_none(self.context_user_id), + parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin), ) try: attrs = json_loads_object(self.attributes) if self.attributes else {} diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 0b8fe9243ba..e0f1163491e 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -7,9 +7,10 @@ from dataclasses import dataclass, replace as dataclass_replace from datetime import timedelta import logging from typing import TYPE_CHECKING, cast +from uuid import UUID import sqlalchemy -from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text +from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text, update from sqlalchemy.engine import CursorResult, Engine from sqlalchemy.exc import ( DatabaseError, @@ -24,20 +25,28 @@ from sqlalchemy.schema import AddConstraint, DropConstraint from sqlalchemy.sql.expression import true from homeassistant.core import HomeAssistant +from homeassistant.util.ulid import ulid_to_bytes from .const import SupportedDialect from .db_schema import ( + CONTEXT_ID_BIN_MAX_LENGTH, SCHEMA_VERSION, STATISTICS_TABLES, TABLE_STATES, Base, + Events, SchemaChanges, + States, Statistics, StatisticsMeta, StatisticsRuns, StatisticsShortTerm, ) from .models import process_timestamp +from .queries import ( + find_events_context_ids_to_migrate, + find_states_context_ids_to_migrate, +) from .statistics import ( correct_db_schema as statistics_correct_db_schema, delete_statistics_duplicates, @@ -56,7 +65,7 @@ if TYPE_CHECKING: from . import Recorder LIVE_MIGRATION_MIN_SCHEMA_VERSION = 0 - +_EMPTY_CONTEXT_ID = b"\x00" * 16 _LOGGER = logging.getLogger(__name__) @@ -219,7 +228,10 @@ def _create_index( def _drop_index( - session_maker: Callable[[], Session], table_name: str, index_name: str + session_maker: Callable[[], Session], + table_name: str, + index_name: str, + quiet: bool | None = None, ) -> None: """Drop an index from a specified table. @@ -282,33 +294,37 @@ def _drop_index( _LOGGER.debug( "Finished dropping index %s from table %s", index_name, table_name ) - else: - if index_name in ( - "ix_states_entity_id", - "ix_states_context_parent_id", - "ix_statistics_short_term_statistic_id_start", - "ix_statistics_statistic_id_start", - ): - # ix_states_context_parent_id was only there on nightly so we do not want - # to generate log noise or issues about it. - # - # ix_states_entity_id was only there for users who upgraded from schema - # version 8 or earlier. Newer installs will not have it so we do not - # want to generate log noise or issues about it. - # - # ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start - # were only there for users who upgraded from schema version 23 or earlier. - return + return - _LOGGER.warning( - ( - "Failed to drop index %s from table %s. Schema " - "Migration will continue; this is not a " - "critical operation" - ), - index_name, - table_name, - ) + if quiet: + return + + if index_name in ( + "ix_states_entity_id", + "ix_states_context_parent_id", + "ix_statistics_short_term_statistic_id_start", + "ix_statistics_statistic_id_start", + ): + # ix_states_context_parent_id was only there on nightly so we do not want + # to generate log noise or issues about it. + # + # ix_states_entity_id was only there for users who upgraded from schema + # version 8 or earlier. Newer installs will not have it so we do not + # want to generate log noise or issues about it. + # + # ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start + # were only there for users who upgraded from schema version 23 or earlier. + return + + _LOGGER.warning( + ( + "Failed to drop index %s from table %s. Schema " + "Migration will continue; this is not a " + "critical operation" + ), + index_name, + table_name, + ) def _add_columns( @@ -522,10 +538,15 @@ def _apply_update( # noqa: C901 """Perform operations to bring schema up to date.""" dialect = engine.dialect.name big_int = "INTEGER(20)" if dialect == SupportedDialect.MYSQL else "INTEGER" - if dialect in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL): + if dialect == SupportedDialect.MYSQL: timestamp_type = "DOUBLE PRECISION" + context_bin_type = f"BLOB({CONTEXT_ID_BIN_MAX_LENGTH})" + if dialect == SupportedDialect.POSTGRESQL: + timestamp_type = "DOUBLE PRECISION" + context_bin_type = "BYTEA" else: timestamp_type = "FLOAT" + context_bin_type = "BLOB" if new_version == 1: # This used to create ix_events_time_fired, but it was removed in version 32 @@ -944,6 +965,19 @@ def _apply_update( # noqa: C901 ) # ix_statistics_start and ix_statistics_statistic_id_start are still used # for the post migration cleanup and can be removed in a future version. + elif new_version == 36: + for table in ("states", "events"): + _add_columns( + session_maker, + table, + [ + f"context_id_bin {context_bin_type}", + f"context_user_id_bin {context_bin_type}", + f"context_parent_id_bin {context_bin_type}", + ], + ) + _create_index(session_maker, "events", "ix_events_context_id_bin") + _create_index(session_maker, "states", "ix_states_context_id_bin") else: raise ValueError(f"No schema migration defined for version {new_version}") @@ -1193,6 +1227,67 @@ def _migrate_statistics_columns_to_timestamp( ) +def _context_id_to_bytes(context_id: str | None) -> bytes | None: + """Convert a context_id to bytes.""" + if context_id is None: + return None + if len(context_id) == 32: + return UUID(context_id).bytes + if len(context_id) == 26: + return ulid_to_bytes(context_id) + return None + + +def migrate_context_ids(instance: Recorder) -> bool: + """Migrate context_ids to use binary format.""" + _to_bytes = _context_id_to_bytes + session_maker = instance.get_session + _LOGGER.debug("Migrating context_ids to binary format") + with session_scope(session=session_maker()) as session: + if events := session.execute(find_events_context_ids_to_migrate()).all(): + session.execute( + update(Events), + [ + { + "event_id": event_id, + "context_id": None, + "context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID, + "context_user_id": None, + "context_user_id_bin": _to_bytes(context_user_id), + "context_parent_id": None, + "context_parent_id_bin": _to_bytes(context_parent_id), + } + for event_id, context_id, context_user_id, context_parent_id in events + ], + ) + if states := session.execute(find_states_context_ids_to_migrate()).all(): + session.execute( + update(States), + [ + { + "state_id": state_id, + "context_id": None, + "context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID, + "context_user_id": None, + "context_user_id_bin": _to_bytes(context_user_id), + "context_parent_id": None, + "context_parent_id_bin": _to_bytes(context_parent_id), + } + for state_id, context_id, context_user_id, context_parent_id in states + ], + ) + # If there is more work to do return False + # so that we can be called again + is_done = not (events or states) + + if is_done: + _drop_index(session_maker, "events", "ix_events_context_id", quiet=True) + _drop_index(session_maker, "states", "ix_states_context_id", quiet=True) + + _LOGGER.debug("Migrating context_ids to binary format: done=%s", is_done) + return is_done + + def _initialize_database(session: Session) -> bool: """Initialize a new database. diff --git a/homeassistant/components/recorder/models.py b/homeassistant/components/recorder/models.py index acdf61743f9..053c870d8a0 100644 --- a/homeassistant/components/recorder/models.py +++ b/homeassistant/components/recorder/models.py @@ -1,10 +1,13 @@ """Models for Recorder.""" from __future__ import annotations +from contextlib import suppress from dataclasses import dataclass from datetime import datetime, timedelta +from functools import lru_cache import logging from typing import Any, Literal, TypedDict, overload +from uuid import UUID from awesomeversion import AwesomeVersion from sqlalchemy.engine.row import Row @@ -18,6 +21,7 @@ from homeassistant.const import ( from homeassistant.core import Context, State import homeassistant.util.dt as dt_util from homeassistant.util.json import json_loads_object +from homeassistant.util.ulid import bytes_to_ulid, ulid_to_bytes from .const import SupportedDialect @@ -155,6 +159,40 @@ def timestamp_to_datetime_or_none(ts: float | None) -> datetime | None: return dt_util.utc_from_timestamp(ts) +def ulid_to_bytes_or_none(ulid: str | None) -> bytes | None: + """Convert an ulid to bytes.""" + if ulid is None: + return None + return ulid_to_bytes(ulid) + + +def bytes_to_ulid_or_none(_bytes: bytes | None) -> str | None: + """Convert bytes to a ulid.""" + if _bytes is None: + return None + return bytes_to_ulid(_bytes) + + +@lru_cache(maxsize=16) +def uuid_hex_to_bytes_or_none(uuid_hex: str | None) -> bytes | None: + """Convert a uuid hex to bytes.""" + if uuid_hex is None: + return None + with suppress(ValueError): + return UUID(hex=uuid_hex).bytes + return None + + +@lru_cache(maxsize=16) +def bytes_to_uuid_hex_or_none(_bytes: bytes | None) -> str | None: + """Convert bytes to a uuid hex.""" + if _bytes is None: + return None + with suppress(ValueError): + return UUID(bytes=_bytes).hex + return None + + class LazyStatePreSchema31(State): """A lazy version of core State before schema 31.""" diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index d93a6b0d62b..217b7ed11bb 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -667,3 +667,31 @@ def find_legacy_row() -> StatementLambdaElement: # https://github.com/sqlalchemy/sqlalchemy/issues/9189 # pylint: disable-next=not-callable return lambda_stmt(lambda: select(func.max(States.event_id))) + + +def find_events_context_ids_to_migrate() -> StatementLambdaElement: + """Find events context_ids to migrate.""" + return lambda_stmt( + lambda: select( + Events.event_id, + Events.context_id, + Events.context_user_id, + Events.context_parent_id, + ) + .filter(Events.context_id_bin.is_(None)) + .limit(SQLITE_MAX_BIND_VARS) + ) + + +def find_states_context_ids_to_migrate() -> StatementLambdaElement: + """Find events context_ids to migrate.""" + return lambda_stmt( + lambda: select( + States.state_id, + States.context_id, + States.context_user_id, + States.context_parent_id, + ) + .filter(States.context_id_bin.is_(None)) + .limit(SQLITE_MAX_BIND_VARS) + ) diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index c8ad1aeb897..37a02772572 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -6,6 +6,7 @@ import asyncio from collections.abc import Callable, Iterable from dataclasses import dataclass from datetime import datetime +import logging import threading from typing import TYPE_CHECKING, Any @@ -18,6 +19,9 @@ from .db_schema import Statistics, StatisticsShortTerm from .models import StatisticData, StatisticMetaData from .util import periodic_db_cleanups +_LOGGER = logging.getLogger(__name__) + + if TYPE_CHECKING: from .core import Recorder @@ -339,3 +343,16 @@ class AdjustLRUSizeTask(RecorderTask): def run(self, instance: Recorder) -> None: """Handle the task to adjust the size.""" instance._adjust_lru_size() # pylint: disable=[protected-access] + + +@dataclass +class ContextIDMigrationTask(RecorderTask): + """An object to insert into the recorder queue to migrate context ids.""" + + commit_before = False + + def run(self, instance: Recorder) -> None: + """Run context id migration task.""" + if not instance._migrate_context_ids(): # pylint: disable=[protected-access] + # Schedule a new migration task if this one didn't finish + instance.queue_task(ContextIDMigrationTask()) diff --git a/homeassistant/util/ulid.py b/homeassistant/util/ulid.py index 304a42ec610..643286cedb9 100644 --- a/homeassistant/util/ulid.py +++ b/homeassistant/util/ulid.py @@ -3,9 +3,9 @@ from __future__ import annotations import time -from ulid_transform import ulid_at_time, ulid_hex +from ulid_transform import bytes_to_ulid, ulid_at_time, ulid_hex, ulid_to_bytes -__all__ = ["ulid", "ulid_hex", "ulid_at_time"] +__all__ = ["ulid", "ulid_hex", "ulid_at_time", "ulid_to_bytes", "bytes_to_ulid"] def ulid(timestamp: float | None = None) -> str: diff --git a/tests/components/logbook/common.py b/tests/components/logbook/common.py index e6bce9e6fbc..d08366e2f1b 100644 --- a/tests/components/logbook/common.py +++ b/tests/components/logbook/common.py @@ -6,7 +6,11 @@ from typing import Any from homeassistant.components import logbook from homeassistant.components.logbook import processor -from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat +from homeassistant.components.recorder.models import ( + process_timestamp_to_utc_isoformat, + ulid_to_bytes_or_none, + uuid_hex_to_bytes_or_none, +) from homeassistant.core import Context from homeassistant.helpers import entity_registry as er from homeassistant.helpers.json import JSONEncoder @@ -28,9 +32,13 @@ class MockRow: self.data = data self.time_fired = dt_util.utcnow() self.time_fired_ts = dt_util.utc_to_timestamp(self.time_fired) - self.context_parent_id = context.parent_id if context else None - self.context_user_id = context.user_id if context else None - self.context_id = context.id if context else None + self.context_parent_id_bin = ( + ulid_to_bytes_or_none(context.parent_id) if context else None + ) + self.context_user_id_bin = ( + uuid_hex_to_bytes_or_none(context.user_id) if context else None + ) + self.context_id_bin = ulid_to_bytes_or_none(context.id) if context else None self.state = None self.entity_id = None self.state_id = None diff --git a/tests/components/logbook/test_init.py b/tests/components/logbook/test_init.py index bb83c1fdb5c..a3e240f682f 100644 --- a/tests/components/logbook/test_init.py +++ b/tests/components/logbook/test_init.py @@ -323,9 +323,9 @@ def create_state_changed_event_from_old_new( "event_data", "time_fired", "time_fired_ts", - "context_id", - "context_user_id", - "context_parent_id", + "context_id_bin", + "context_user_id_bin", + "context_parent_id_bin", "state", "entity_id", "domain", @@ -349,12 +349,12 @@ def create_state_changed_event_from_old_new( row.entity_id = entity_id row.domain = entity_id and ha.split_entity_id(entity_id)[0] row.context_only = False - row.context_id = None + row.context_id_bin = None row.friendly_name = None row.icon = None row.old_format_icon = None - row.context_user_id = None - row.context_parent_id = None + row.context_user_id_bin = None + row.context_parent_id_bin = None row.old_state_id = old_state and 1 row.state_id = new_state and 1 return LazyEventPartialState(row, {}) @@ -966,7 +966,7 @@ async def test_logbook_entity_context_id( await async_recorder_block_till_done(hass) context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -1027,7 +1027,7 @@ async def test_logbook_entity_context_id( # A service call light_turn_off_service_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVBFC", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set("light.switch", STATE_ON) @@ -1120,7 +1120,7 @@ async def test_logbook_context_id_automation_script_started_manually( # An Automation automation_entity_id_test = "automation.alarm" automation_context = ha.Context( - id="fc5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVCCC", user_id="f400facee45711eaa9308bfd3d19e474", ) hass.bus.async_fire( @@ -1129,7 +1129,7 @@ async def test_logbook_context_id_automation_script_started_manually( context=automation_context, ) script_context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) hass.bus.async_fire( @@ -1141,7 +1141,7 @@ async def test_logbook_context_id_automation_script_started_manually( hass.bus.async_fire(EVENT_HOMEASSISTANT_START) script_2_context = ha.Context( - id="1234", + id="01GTDGKBCH00GW0X476W5TVEEE", user_id="b400facee45711eaa9308bfd3d19e474", ) hass.bus.async_fire( @@ -1172,12 +1172,12 @@ async def test_logbook_context_id_automation_script_started_manually( assert json_dict[0]["entity_id"] == "automation.alarm" assert "context_entity_id" not in json_dict[0] assert json_dict[0]["context_user_id"] == "f400facee45711eaa9308bfd3d19e474" - assert json_dict[0]["context_id"] == "fc5bd62de45711eaaeb351041eec8dd9" + assert json_dict[0]["context_id"] == "01GTDGKBCH00GW0X476W5TVCCC" assert json_dict[1]["entity_id"] == "script.mock_script" assert "context_entity_id" not in json_dict[1] assert json_dict[1]["context_user_id"] == "b400facee45711eaa9308bfd3d19e474" - assert json_dict[1]["context_id"] == "ac5bd62de45711eaaeb351041eec8dd9" + assert json_dict[1]["context_id"] == "01GTDGKBCH00GW0X476W5TVAAA" assert json_dict[2]["domain"] == "homeassistant" @@ -1185,7 +1185,7 @@ async def test_logbook_context_id_automation_script_started_manually( assert json_dict[3]["name"] == "Mock script" assert "context_entity_id" not in json_dict[1] assert json_dict[3]["context_user_id"] == "b400facee45711eaa9308bfd3d19e474" - assert json_dict[3]["context_id"] == "1234" + assert json_dict[3]["context_id"] == "01GTDGKBCH00GW0X476W5TVEEE" assert json_dict[4]["entity_id"] == "switch.new" assert json_dict[4]["state"] == "off" @@ -1209,7 +1209,7 @@ async def test_logbook_entity_context_parent_id( await async_recorder_block_till_done(hass) context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -1222,8 +1222,8 @@ async def test_logbook_entity_context_parent_id( ) child_context = ha.Context( - id="2798bfedf8234b5e9f4009c91f48f30c", - parent_id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVDDD", + parent_id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) hass.bus.async_fire( @@ -1274,8 +1274,8 @@ async def test_logbook_entity_context_parent_id( # A state change via service call with the script as the parent light_turn_off_service_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", - parent_id="2798bfedf8234b5e9f4009c91f48f30c", + id="01GTDGKBCH00GW0X476W5TVBFC", + parent_id="01GTDGKBCH00GW0X476W5TVDDD", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set("light.switch", STATE_ON) @@ -1299,8 +1299,8 @@ async def test_logbook_entity_context_parent_id( # An event with a parent event, but the parent event isn't available missing_parent_context = ha.Context( - id="fc40b9a0d1f246f98c34b33c76228ee6", - parent_id="c8ce515fe58e442f8664246c65ed964f", + id="01GTDGKBCH00GW0X476W5TEDDD", + parent_id="01GTDGKBCH00GW0X276W5TEDDD", user_id="485cacf93ef84d25a99ced3126b921d2", ) logbook.async_log_entry( @@ -1423,7 +1423,7 @@ async def test_logbook_context_from_template( await hass.async_block_till_done() switch_turn_off_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVBFC", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set( @@ -1506,7 +1506,7 @@ async def test_logbook_( await hass.async_block_till_done() switch_turn_off_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVBFC", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set( @@ -1692,7 +1692,7 @@ async def test_logbook_multiple_entities( await hass.async_block_till_done() switch_turn_off_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVBFC", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set( @@ -2394,7 +2394,7 @@ async def test_get_events( hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) await hass.async_block_till_done() context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -2474,7 +2474,7 @@ async def test_get_events( "id": 5, "type": "logbook/get_events", "start_time": now.isoformat(), - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X476W5TVAAA", } ) response = await client.receive_json() @@ -2651,7 +2651,7 @@ async def test_get_events_with_device_ids( hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) await hass.async_block_till_done() context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -2740,7 +2740,7 @@ async def test_logbook_select_entities_context_id( await async_recorder_block_till_done(hass) context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -2799,7 +2799,7 @@ async def test_logbook_select_entities_context_id( # A service call light_turn_off_service_context = ha.Context( - id="9c5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVBFC", user_id="9400facee45711eaa9308bfd3d19e474", ) hass.states.async_set("light.switch", STATE_ON) @@ -2880,7 +2880,7 @@ async def test_get_events_with_context_state( hass.states.async_set("light.kitchen2", STATE_OFF) context = ha.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X476W5TVAAA", user_id="b400facee45711eaa9308bfd3d19e474", ) hass.states.async_set("binary_sensor.is_light", STATE_OFF, context=context) diff --git a/tests/components/logbook/test_websocket_api.py b/tests/components/logbook/test_websocket_api.py index 6b21c66de8c..9b0b3f2221e 100644 --- a/tests/components/logbook/test_websocket_api.py +++ b/tests/components/logbook/test_websocket_api.py @@ -159,7 +159,7 @@ async def test_get_events( hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) await hass.async_block_till_done() context = core.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X276W5TEDDD", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -239,7 +239,7 @@ async def test_get_events( "id": 5, "type": "logbook/get_events", "start_time": now.isoformat(), - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X276W5TEDDD", } ) response = await client.receive_json() @@ -448,7 +448,7 @@ async def test_get_events_with_device_ids( hass.states.async_set("light.kitchen", STATE_ON, {"brightness": 400}) await hass.async_block_till_done() context = core.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X276W5TEDDD", user_id="b400facee45711eaa9308bfd3d19e474", ) @@ -1262,7 +1262,7 @@ async def test_subscribe_unsubscribe_logbook_stream( ] context = core.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X276W5TEDDD", user_id="b400facee45711eaa9308bfd3d19e474", ) automation_entity_id_test = "automation.alarm" @@ -1300,7 +1300,7 @@ async def test_subscribe_unsubscribe_logbook_stream( assert msg["type"] == "event" assert msg["event"]["events"] == [ { - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X276W5TEDDD", "context_user_id": "b400facee45711eaa9308bfd3d19e474", "domain": "automation", "entity_id": "automation.alarm", @@ -1313,7 +1313,7 @@ async def test_subscribe_unsubscribe_logbook_stream( "context_domain": "automation", "context_entity_id": "automation.alarm", "context_event_type": "automation_triggered", - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X276W5TEDDD", "context_message": "triggered by state of binary_sensor.dog_food_ready", "context_name": "Mock automation", "context_source": "state of binary_sensor.dog_food_ready", @@ -1365,7 +1365,7 @@ async def test_subscribe_unsubscribe_logbook_stream( "context_domain": "automation", "context_entity_id": "automation.alarm", "context_event_type": "automation_triggered", - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X276W5TEDDD", "context_message": "triggered by state of binary_sensor.dog_food_ready", "context_name": "Mock automation", "context_source": "state of binary_sensor.dog_food_ready", @@ -1395,7 +1395,7 @@ async def test_subscribe_unsubscribe_logbook_stream( "context_domain": "automation", "context_entity_id": "automation.alarm", "context_event_type": "automation_triggered", - "context_id": "ac5bd62de45711eaaeb351041eec8dd9", + "context_id": "01GTDGKBCH00GW0X276W5TEDDD", "context_message": "triggered by state of binary_sensor.dog_food_ready", "context_name": "Mock automation", "context_source": "state of binary_sensor.dog_food_ready", @@ -1990,7 +1990,7 @@ async def test_logbook_stream_match_multiple_entities( hass.states.async_set("binary_sensor.should_not_appear", STATE_ON) hass.states.async_set("binary_sensor.should_not_appear", STATE_OFF) context = core.Context( - id="ac5bd62de45711eaaeb351041eec8dd9", + id="01GTDGKBCH00GW0X276W5TEDDD", user_id="b400facee45711eaa9308bfd3d19e474", ) hass.bus.async_fire( diff --git a/tests/components/recorder/db_schema_23_with_newer_columns.py b/tests/components/recorder/db_schema_23_with_newer_columns.py index d63e8d59d25..0cd3f414901 100644 --- a/tests/components/recorder/db_schema_23_with_newer_columns.py +++ b/tests/components/recorder/db_schema_23_with_newer_columns.py @@ -27,6 +27,7 @@ from sqlalchemy import ( Identity, Index, Integer, + LargeBinary, SmallInteger, String, Text, @@ -92,6 +93,10 @@ DOUBLE_TYPE = ( TIMESTAMP_TYPE = DOUBLE_TYPE +CONTEXT_ID_BIN_MAX_LENGTH = 16 +EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" +STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" + class Events(Base): # type: ignore """Event history data.""" @@ -100,6 +105,12 @@ class Events(Base): # type: ignore # Used for fetching events at a specific time # see logbook Index("ix_events_event_type_time_fired", "event_type", "time_fired"), + Index( + EVENTS_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_EVENTS @@ -121,6 +132,15 @@ class Events(Base): # type: ignore data_id = Column( Integer, ForeignKey("event_data.data_id"), index=True ) # *** Not originally in v23, only added for recorder to startup ok + context_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_user_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_parent_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok event_data_rel = relationship( "EventData" ) # *** Not originally in v23, only added for recorder to startup ok @@ -191,6 +211,12 @@ class States(Base): # type: ignore # Used for fetching the state of entities at a specific time # (get_states in history.py) Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"), + Index( + STATES_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_STATES @@ -212,6 +238,15 @@ class States(Base): # type: ignore ) # *** Not originally in v23, only added for recorder to startup ok created = Column(DATETIME_TYPE, default=dt_util.utcnow) old_state_id = Column(Integer, ForeignKey("states.state_id"), index=True) + context_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_user_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_parent_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok event = relationship("Events", uselist=False) old_state = relationship("States", remote_side=[state_id]) diff --git a/tests/components/recorder/db_schema_30.py b/tests/components/recorder/db_schema_30.py index 91f7593969a..7862ad06142 100644 --- a/tests/components/recorder/db_schema_30.py +++ b/tests/components/recorder/db_schema_30.py @@ -23,6 +23,7 @@ from sqlalchemy import ( Identity, Index, Integer, + LargeBinary, SmallInteger, String, Text, @@ -96,6 +97,9 @@ LAST_UPDATED_INDEX = "ix_states_last_updated" ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated" EVENTS_CONTEXT_ID_INDEX = "ix_events_context_id" STATES_CONTEXT_ID_INDEX = "ix_states_context_id" +CONTEXT_ID_BIN_MAX_LENGTH = 16 +EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" +STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" class FAST_PYSQLITE_DATETIME(sqlite.DATETIME): # type: ignore[misc] @@ -193,6 +197,12 @@ class Events(Base): # type: ignore[misc,valid-type] # Used for fetching events at a specific time # see logbook Index("ix_events_event_type_time_fired", "event_type", "time_fired"), + Index( + EVENTS_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_EVENTS @@ -206,6 +216,15 @@ class Events(Base): # type: ignore[misc,valid-type] context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True) + context_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v30, only added for recorder to startup ok + context_user_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_parent_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v30, only added for recorder to startup ok event_data_rel = relationship("EventData") def __repr__(self) -> str: @@ -310,6 +329,12 @@ class States(Base): # type: ignore[misc,valid-type] # Used for fetching the state of entities at a specific time # (get_states in history.py) Index(ENTITY_ID_LAST_UPDATED_INDEX, "entity_id", "last_updated"), + Index( + STATES_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_STATES @@ -332,6 +357,15 @@ class States(Base): # type: ignore[misc,valid-type] context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) origin_idx = Column(SmallInteger) # 0 is local, 1 is remote + context_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v30, only added for recorder to startup ok + context_user_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v23, only added for recorder to startup ok + context_parent_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v30, only added for recorder to startup ok old_state = relationship("States", remote_side=[state_id]) state_attributes = relationship("StateAttributes") diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 19c7e6c6955..730d90e14ba 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -6,9 +6,10 @@ import sqlite3 import sys import threading from unittest.mock import Mock, PropertyMock, call, patch +import uuid import pytest -from sqlalchemy import create_engine, text +from sqlalchemy import create_engine, inspect, text from sqlalchemy.exc import ( DatabaseError, InternalError, @@ -23,17 +24,25 @@ from homeassistant.components import persistent_notification as pn, recorder from homeassistant.components.recorder import db_schema, migration from homeassistant.components.recorder.db_schema import ( SCHEMA_VERSION, + Events, RecorderRuns, States, ) +from homeassistant.components.recorder.tasks import ContextIDMigrationTask from homeassistant.components.recorder.util import session_scope from homeassistant.core import HomeAssistant from homeassistant.helpers import recorder as recorder_helper import homeassistant.util.dt as dt_util +from homeassistant.util.ulid import bytes_to_ulid -from .common import async_wait_recording_done, create_engine_test +from .common import ( + async_recorder_block_till_done, + async_wait_recording_done, + create_engine_test, +) from tests.common import async_fire_time_changed +from tests.typing import RecorderInstanceGenerator ORIG_TZ = dt_util.DEFAULT_TIME_ZONE @@ -535,3 +544,147 @@ def test_raise_if_exception_missing_empty_cause_str() -> None: with pytest.raises(ProgrammingError): migration.raise_if_exception_missing_str(programming_exc, ["not present"]) + + +@pytest.mark.parametrize("enable_migrate_context_ids", [True]) +async def test_migrate_context_ids( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test we can migrate old uuid context ids and ulid context ids to binary format.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + test_uuid = uuid.uuid4() + uuid_hex = test_uuid.hex + uuid_bin = test_uuid.bytes + + def _insert_events(): + with session_scope(hass=hass) as session: + session.add_all( + ( + Events( + event_type="old_uuid_context_id_event", + event_data=None, + origin_idx=0, + time_fired=None, + time_fired_ts=1677721632.452529, + context_id=uuid_hex, + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ), + Events( + event_type="empty_context_id_event", + event_data=None, + origin_idx=0, + time_fired=None, + time_fired_ts=1677721632.552529, + context_id=None, + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ), + Events( + event_type="ulid_context_id_event", + event_data=None, + origin_idx=0, + time_fired=None, + time_fired_ts=1677721632.552529, + context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV", + context_id_bin=None, + context_user_id="9400facee45711eaa9308bfd3d19e474", + context_user_id_bin=None, + context_parent_id="01ARZ3NDEKTSV4RRFFQ69G5FA2", + context_parent_id_bin=None, + ), + Events( + event_type="invalid_context_id_event", + event_data=None, + origin_idx=0, + time_fired=None, + time_fired_ts=1677721632.552529, + context_id="invalid", + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ), + ) + ) + + await instance.async_add_executor_job(_insert_events) + + await async_wait_recording_done(hass) + # This is a threadsafe way to add a task to the recorder + instance.queue_task(ContextIDMigrationTask()) + await async_recorder_block_till_done(hass) + + def _object_as_dict(obj): + return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} + + def _fetch_migrated_events(): + with session_scope(hass=hass) as session: + events = ( + session.query(Events) + .filter( + Events.event_type.in_( + [ + "old_uuid_context_id_event", + "empty_context_id_event", + "ulid_context_id_event", + "invalid_context_id_event", + ] + ) + ) + .all() + ) + assert len(events) == 4 + return {event.event_type: _object_as_dict(event) for event in events} + + events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) + + old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"] + assert old_uuid_context_id_event["context_id"] is None + assert old_uuid_context_id_event["context_user_id"] is None + assert old_uuid_context_id_event["context_parent_id"] is None + assert old_uuid_context_id_event["context_id_bin"] == uuid_bin + assert old_uuid_context_id_event["context_user_id_bin"] is None + assert old_uuid_context_id_event["context_parent_id_bin"] is None + + empty_context_id_event = events_by_type["empty_context_id_event"] + assert empty_context_id_event["context_id"] is None + assert empty_context_id_event["context_user_id"] is None + assert empty_context_id_event["context_parent_id"] is None + assert empty_context_id_event["context_id_bin"] == b"\x00" * 16 + assert empty_context_id_event["context_user_id_bin"] is None + assert empty_context_id_event["context_parent_id_bin"] is None + + ulid_context_id_event = events_by_type["ulid_context_id_event"] + assert ulid_context_id_event["context_id"] is None + assert ulid_context_id_event["context_user_id"] is None + assert ulid_context_id_event["context_parent_id"] is None + assert ( + bytes_to_ulid(ulid_context_id_event["context_id_bin"]) + == "01ARZ3NDEKTSV4RRFFQ69G5FAV" + ) + assert ( + ulid_context_id_event["context_user_id_bin"] + == b"\x94\x00\xfa\xce\xe4W\x11\xea\xa90\x8b\xfd=\x19\xe4t" + ) + assert ( + bytes_to_ulid(ulid_context_id_event["context_parent_id_bin"]) + == "01ARZ3NDEKTSV4RRFFQ69G5FA2" + ) + + invalid_context_id_event = events_by_type["invalid_context_id_event"] + assert invalid_context_id_event["context_id"] is None + assert invalid_context_id_event["context_user_id"] is None + assert invalid_context_id_event["context_parent_id"] is None + assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16 + assert invalid_context_id_event["context_user_id_bin"] is None + assert invalid_context_id_event["context_parent_id_bin"] is None diff --git a/tests/conftest.py b/tests/conftest.py index 9c1eef3ffbd..ed5a95f1b25 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1138,6 +1138,16 @@ def enable_nightly_purge() -> bool: return False +@pytest.fixture +def enable_migrate_context_ids() -> bool: + """Fixture to control enabling of recorder's context id migration. + + To enable context id migration, tests can be marked with: + @pytest.mark.parametrize("enable_migrate_context_ids", [True]) + """ + return False + + @pytest.fixture def recorder_config() -> dict[str, Any] | None: """Fixture to override recorder config. @@ -1280,6 +1290,7 @@ async def async_setup_recorder_instance( enable_nightly_purge: bool, enable_statistics: bool, enable_statistics_table_validation: bool, + enable_migrate_context_ids: bool, ) -> AsyncGenerator[RecorderInstanceGenerator, None]: """Yield callable to setup recorder instance.""" # pylint: disable-next=import-outside-toplevel @@ -1295,6 +1306,9 @@ async def async_setup_recorder_instance( if enable_statistics_table_validation else itertools.repeat(set()) ) + migrate_context_ids = ( + recorder.Recorder._migrate_context_ids if enable_migrate_context_ids else None + ) with patch( "homeassistant.components.recorder.Recorder.async_nightly_tasks", side_effect=nightly, @@ -1307,6 +1321,10 @@ async def async_setup_recorder_instance( "homeassistant.components.recorder.migration.statistics_validate_db_schema", side_effect=stats_validate, autospec=True, + ), patch( + "homeassistant.components.recorder.Recorder._migrate_context_ids", + side_effect=migrate_context_ids, + autospec=True, ): async def async_setup_recorder(