From 591ffe23400b6620efc4615fe682a4d0fd6d8568 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 19 Apr 2023 15:56:07 -1000 Subject: [PATCH] Fallback to generating a new ULID on migraiton if context is missing or invalid (#91704) * Fallback to generating a new ULID on migraiton if context is missing or invalid It was discovered that postgresql will do a full scan if there is a low cardinality on the index because of missing context ids. We will now generate a ULID for the timestamp of the row if the context data is missing or invalid fixes #91514 * tests * tweak * tweak * preen --- .../components/recorder/migration.py | 19 ++-- homeassistant/components/recorder/queries.py | 2 + .../recorder/test_migration_from_schema_32.py | 93 ++++++++++++++----- 3 files changed, 83 insertions(+), 31 deletions(-) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 8b50b419f1b..c487f0b70d7 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -6,6 +6,7 @@ import contextlib from dataclasses import dataclass, replace as dataclass_replace from datetime import timedelta import logging +from time import time from typing import TYPE_CHECKING, cast from uuid import UUID @@ -26,7 +27,7 @@ from sqlalchemy.sql.expression import true from homeassistant.core import HomeAssistant from homeassistant.util.enum import try_parse_enum -from homeassistant.util.ulid import ulid_to_bytes +from homeassistant.util.ulid import ulid_at_time, ulid_to_bytes from .auto_repairs.events.schema import ( correct_db_schema as events_correct_db_schema, @@ -92,7 +93,6 @@ if TYPE_CHECKING: from . import Recorder LIVE_MIGRATION_MIN_SCHEMA_VERSION = 0 -_EMPTY_CONTEXT_ID = b"\x00" * 16 _EMPTY_ENTITY_ID = "missing.entity_id" _EMPTY_EVENT_TYPE = "missing_event_type" @@ -1370,6 +1370,11 @@ def _context_id_to_bytes(context_id: str | None) -> bytes | None: return None +def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes: + """Generate a ulid with a specific timestamp.""" + return ulid_to_bytes(ulid_at_time(timestamp or time())) + + @retryable_database_job("migrate states context_ids to binary format") def migrate_states_context_ids(instance: Recorder) -> bool: """Migrate states context_ids to use binary format.""" @@ -1384,13 +1389,14 @@ def migrate_states_context_ids(instance: Recorder) -> bool: { "state_id": state_id, "context_id": None, - "context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID, + "context_id_bin": _to_bytes(context_id) + or _generate_ulid_bytes_at_time(last_updated_ts), "context_user_id": None, "context_user_id_bin": _to_bytes(context_user_id), "context_parent_id": None, "context_parent_id_bin": _to_bytes(context_parent_id), } - for state_id, context_id, context_user_id, context_parent_id in states + for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states ], ) # If there is more work to do return False @@ -1418,13 +1424,14 @@ def migrate_events_context_ids(instance: Recorder) -> bool: { "event_id": event_id, "context_id": None, - "context_id_bin": _to_bytes(context_id) or _EMPTY_CONTEXT_ID, + "context_id_bin": _to_bytes(context_id) + or _generate_ulid_bytes_at_time(time_fired_ts), "context_user_id": None, "context_user_id_bin": _to_bytes(context_user_id), "context_parent_id": None, "context_parent_id_bin": _to_bytes(context_parent_id), } - for event_id, context_id, context_user_id, context_parent_id in events + for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events ], ) # If there is more work to do return False diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 454c71f6dc5..f8a1b769d87 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -690,6 +690,7 @@ def find_events_context_ids_to_migrate() -> StatementLambdaElement: return lambda_stmt( lambda: select( Events.event_id, + Events.time_fired_ts, Events.context_id, Events.context_user_id, Events.context_parent_id, @@ -788,6 +789,7 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement: return lambda_stmt( lambda: select( States.state_id, + States.last_updated_ts, States.context_id, States.context_user_id, States.context_parent_id, diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index 01c086e119a..f76cf318008 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -5,6 +5,7 @@ import sys from unittest.mock import patch import uuid +from freezegun import freeze_time import pytest from sqlalchemy import create_engine, inspect from sqlalchemy.orm import Session @@ -28,7 +29,7 @@ from homeassistant.components.recorder.tasks import ( from homeassistant.components.recorder.util import session_scope from homeassistant.core import HomeAssistant import homeassistant.util.dt as dt_util -from homeassistant.util.ulid import bytes_to_ulid +from homeassistant.util.ulid import bytes_to_ulid, ulid_at_time, ulid_to_bytes from .common import async_recorder_block_till_done, async_wait_recording_done @@ -115,7 +116,7 @@ async def test_migrate_events_context_ids( event_data=None, origin_idx=0, time_fired=None, - time_fired_ts=1677721632.452529, + time_fired_ts=1877721632.452529, context_id=uuid_hex, context_id_bin=None, context_user_id=None, @@ -128,7 +129,7 @@ async def test_migrate_events_context_ids( event_data=None, origin_idx=0, time_fired=None, - time_fired_ts=1677721632.552529, + time_fired_ts=1877721632.552529, context_id=None, context_id_bin=None, context_user_id=None, @@ -141,7 +142,7 @@ async def test_migrate_events_context_ids( event_data=None, origin_idx=0, time_fired=None, - time_fired_ts=1677721632.552529, + time_fired_ts=1877721632.552529, context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV", context_id_bin=None, context_user_id="9400facee45711eaa9308bfd3d19e474", @@ -154,7 +155,7 @@ async def test_migrate_events_context_ids( event_data=None, origin_idx=0, time_fired=None, - time_fired_ts=1677721632.552529, + time_fired_ts=1877721632.552529, context_id="invalid", context_id_bin=None, context_user_id=None, @@ -167,7 +168,20 @@ async def test_migrate_events_context_ids( event_data=None, origin_idx=0, time_fired=None, - time_fired_ts=1677721632.552529, + time_fired_ts=1277721632.552529, + context_id="adapt_lgt:b'5Cf*':interval:b'0R'", + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ), + Events( + event_type="event_with_garbage_context_id_no_time_fired_ts", + event_data=None, + origin_idx=0, + time_fired=None, + time_fired_ts=None, context_id="adapt_lgt:b'5Cf*':interval:b'0R'", context_id_bin=None, context_user_id=None, @@ -181,9 +195,12 @@ async def test_migrate_events_context_ids( await instance.async_add_executor_job(_insert_events) await async_wait_recording_done(hass) - # This is a threadsafe way to add a task to the recorder - instance.queue_task(EventsContextIDMigrationTask()) - await async_recorder_block_till_done(hass) + now = dt_util.utcnow() + expected_ulid_fallback_start = ulid_to_bytes(ulid_at_time(now.timestamp()))[0:6] + with freeze_time(now): + # This is a threadsafe way to add a task to the recorder + instance.queue_task(EventsContextIDMigrationTask()) + await async_recorder_block_till_done(hass) def _object_as_dict(obj): return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} @@ -200,12 +217,13 @@ async def test_migrate_events_context_ids( "ulid_context_id_event", "invalid_context_id_event", "garbage_context_id_event", + "event_with_garbage_context_id_no_time_fired_ts", ] ) ) .all() ) - assert len(events) == 5 + assert len(events) == 6 return {event.event_type: _object_as_dict(event) for event in events} events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) @@ -222,7 +240,9 @@ async def test_migrate_events_context_ids( assert empty_context_id_event["context_id"] is None assert empty_context_id_event["context_user_id"] is None assert empty_context_id_event["context_parent_id"] is None - assert empty_context_id_event["context_id_bin"] == b"\x00" * 16 + assert empty_context_id_event["context_id_bin"].startswith( + b"\x01\xb50\xeeO(" + ) # 6 bytes of timestamp + random assert empty_context_id_event["context_user_id_bin"] is None assert empty_context_id_event["context_parent_id_bin"] is None @@ -247,7 +267,9 @@ async def test_migrate_events_context_ids( assert invalid_context_id_event["context_id"] is None assert invalid_context_id_event["context_user_id"] is None assert invalid_context_id_event["context_parent_id"] is None - assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16 + assert invalid_context_id_event["context_id_bin"].startswith( + b"\x01\xb50\xeeO(" + ) # 6 bytes of timestamp + random assert invalid_context_id_event["context_user_id_bin"] is None assert invalid_context_id_event["context_parent_id_bin"] is None @@ -255,10 +277,26 @@ async def test_migrate_events_context_ids( assert garbage_context_id_event["context_id"] is None assert garbage_context_id_event["context_user_id"] is None assert garbage_context_id_event["context_parent_id"] is None - assert garbage_context_id_event["context_id_bin"] == b"\x00" * 16 + assert garbage_context_id_event["context_id_bin"].startswith( + b"\x01)~$\xdf(" + ) # 6 bytes of timestamp + random assert garbage_context_id_event["context_user_id_bin"] is None assert garbage_context_id_event["context_parent_id_bin"] is None + event_with_garbage_context_id_no_time_fired_ts = events_by_type[ + "event_with_garbage_context_id_no_time_fired_ts" + ] + assert event_with_garbage_context_id_no_time_fired_ts["context_id"] is None + assert event_with_garbage_context_id_no_time_fired_ts["context_user_id"] is None + assert event_with_garbage_context_id_no_time_fired_ts["context_parent_id"] is None + assert event_with_garbage_context_id_no_time_fired_ts["context_id_bin"].startswith( + expected_ulid_fallback_start + ) # 6 bytes of timestamp + random + assert event_with_garbage_context_id_no_time_fired_ts["context_user_id_bin"] is None + assert ( + event_with_garbage_context_id_no_time_fired_ts["context_parent_id_bin"] is None + ) + @pytest.mark.parametrize("enable_migrate_context_ids", [True]) async def test_migrate_states_context_ids( @@ -272,13 +310,13 @@ async def test_migrate_states_context_ids( uuid_hex = test_uuid.hex uuid_bin = test_uuid.bytes - def _insert_events(): + def _insert_states(): with session_scope(hass=hass) as session: session.add_all( ( States( entity_id="state.old_uuid_context_id", - last_updated_ts=1677721632.452529, + last_updated_ts=1477721632.452529, context_id=uuid_hex, context_id_bin=None, context_user_id=None, @@ -288,7 +326,7 @@ async def test_migrate_states_context_ids( ), States( entity_id="state.empty_context_id", - last_updated_ts=1677721632.552529, + last_updated_ts=1477721632.552529, context_id=None, context_id_bin=None, context_user_id=None, @@ -298,7 +336,7 @@ async def test_migrate_states_context_ids( ), States( entity_id="state.ulid_context_id", - last_updated_ts=1677721632.552529, + last_updated_ts=1477721632.552529, context_id="01ARZ3NDEKTSV4RRFFQ69G5FAV", context_id_bin=None, context_user_id="9400facee45711eaa9308bfd3d19e474", @@ -308,7 +346,7 @@ async def test_migrate_states_context_ids( ), States( entity_id="state.invalid_context_id", - last_updated_ts=1677721632.552529, + last_updated_ts=1477721632.552529, context_id="invalid", context_id_bin=None, context_user_id=None, @@ -318,7 +356,7 @@ async def test_migrate_states_context_ids( ), States( entity_id="state.garbage_context_id", - last_updated_ts=1677721632.552529, + last_updated_ts=1477721632.552529, context_id="adapt_lgt:b'5Cf*':interval:b'0R'", context_id_bin=None, context_user_id=None, @@ -328,7 +366,7 @@ async def test_migrate_states_context_ids( ), States( entity_id="state.human_readable_uuid_context_id", - last_updated_ts=1677721632.552529, + last_updated_ts=1477721632.552529, context_id="0ae29799-ee4e-4f45-8116-f582d7d3ee65", context_id_bin=None, context_user_id="0ae29799-ee4e-4f45-8116-f582d7d3ee65", @@ -339,10 +377,9 @@ async def test_migrate_states_context_ids( ) ) - await instance.async_add_executor_job(_insert_events) + await instance.async_add_executor_job(_insert_states) await async_wait_recording_done(hass) - # This is a threadsafe way to add a task to the recorder instance.queue_task(StatesContextIDMigrationTask()) await async_recorder_block_till_done(hass) @@ -384,7 +421,9 @@ async def test_migrate_states_context_ids( assert empty_context_id["context_id"] is None assert empty_context_id["context_user_id"] is None assert empty_context_id["context_parent_id"] is None - assert empty_context_id["context_id_bin"] == b"\x00" * 16 + assert empty_context_id["context_id_bin"].startswith( + b"\x01X\x0f\x12\xaf(" + ) # 6 bytes of timestamp + random assert empty_context_id["context_user_id_bin"] is None assert empty_context_id["context_parent_id_bin"] is None @@ -408,7 +447,9 @@ async def test_migrate_states_context_ids( assert invalid_context_id["context_id"] is None assert invalid_context_id["context_user_id"] is None assert invalid_context_id["context_parent_id"] is None - assert invalid_context_id["context_id_bin"] == b"\x00" * 16 + assert invalid_context_id["context_id_bin"].startswith( + b"\x01X\x0f\x12\xaf(" + ) # 6 bytes of timestamp + random assert invalid_context_id["context_user_id_bin"] is None assert invalid_context_id["context_parent_id_bin"] is None @@ -416,7 +457,9 @@ async def test_migrate_states_context_ids( assert garbage_context_id["context_id"] is None assert garbage_context_id["context_user_id"] is None assert garbage_context_id["context_parent_id"] is None - assert garbage_context_id["context_id_bin"] == b"\x00" * 16 + assert garbage_context_id["context_id_bin"].startswith( + b"\x01X\x0f\x12\xaf(" + ) # 6 bytes of timestamp + random assert garbage_context_id["context_user_id_bin"] is None assert garbage_context_id["context_parent_id_bin"] is None