From 942fbdedcf3f91ce80f7eeab1861687c9a3fdf9a Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 26 Dec 2024 07:48:55 -1000 Subject: [PATCH] Ensure all states have been migrated to use timestamps (#134007) --- .../components/recorder/db_schema.py | 2 +- .../components/recorder/migration.py | 17 ++- .../recorder/test_migration_from_schema_32.py | 140 ++++++++++++++++++ 3 files changed, 156 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index fa4162f4183..2afbed9cb75 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -77,7 +77,7 @@ class LegacyBase(DeclarativeBase): """Base class for tables, used for schema migration.""" -SCHEMA_VERSION = 47 +SCHEMA_VERSION = 48 _LOGGER = logging.getLogger(__name__) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index d57db03f90e..8c9252ba28b 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1976,6 +1976,17 @@ class _SchemaVersion47Migrator(_SchemaVersionMigrator, target_version=47): ) +class _SchemaVersion48Migrator(_SchemaVersionMigrator, target_version=48): + def _apply_update(self) -> None: + """Version specific update method.""" + # https://github.com/home-assistant/core/issues/134002 + # If the system has unmigrated states rows, we need to + # ensure they are migrated now so the new optimized + # queries can be used. For most systems, this should + # be very fast and nothing will be migrated. + _migrate_columns_to_timestamp(self.instance, self.session_maker, self.engine) + + def _migrate_statistics_columns_to_timestamp_removing_duplicates( hass: HomeAssistant, instance: Recorder, @@ -2109,7 +2120,8 @@ def _migrate_columns_to_timestamp( connection.execute( text( 'UPDATE events set time_fired_ts=strftime("%s",time_fired) + ' - "cast(substr(time_fired,-7) AS FLOAT);" + "cast(substr(time_fired,-7) AS FLOAT) " + "WHERE time_fired_ts is NULL;" ) ) connection.execute( @@ -2117,7 +2129,8 @@ def _migrate_columns_to_timestamp( 'UPDATE states set last_updated_ts=strftime("%s",last_updated) + ' "cast(substr(last_updated,-7) AS FLOAT), " 'last_changed_ts=strftime("%s",last_changed) + ' - "cast(substr(last_changed,-7) AS FLOAT);" + "cast(substr(last_changed,-7) AS FLOAT) " + " WHERE last_updated_ts is NULL;" ) ) elif engine.dialect.name == SupportedDialect.MYSQL: diff --git a/tests/components/recorder/test_migration_from_schema_32.py b/tests/components/recorder/test_migration_from_schema_32.py index 3cc654c0fa1..0624955b0e9 100644 --- a/tests/components/recorder/test_migration_from_schema_32.py +++ b/tests/components/recorder/test_migration_from_schema_32.py @@ -2142,3 +2142,143 @@ async def test_stats_migrate_times( ) await hass.async_stop() + + +@pytest.mark.parametrize("persistent_database", [True]) +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_cleanup_unmigrated_state_timestamps( + async_test_recorder: RecorderInstanceGenerator, +) -> None: + """Ensure schema 48 migration cleans up any unmigrated state timestamps.""" + importlib.import_module(SCHEMA_MODULE_32) + old_db_schema = sys.modules[SCHEMA_MODULE_32] + + test_uuid = uuid.uuid4() + uuid_hex = test_uuid.hex + + def _object_as_dict(obj): + return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs} + + def _insert_states(): + with session_scope(hass=hass) as session: + state1 = old_db_schema.States( + entity_id="state.test_state1", + last_updated=datetime.datetime( + 2016, 10, 28, 20, 13, 52, 452529, tzinfo=datetime.UTC + ), + last_updated_ts=None, + last_changed=datetime.datetime( + 2016, 10, 28, 20, 13, 52, 452529, tzinfo=datetime.UTC + ), + last_changed_ts=None, + context_id=uuid_hex, + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ) + state2 = old_db_schema.States( + entity_id="state.test_state2", + last_updated=datetime.datetime( + 2016, 10, 28, 20, 13, 52, 552529, tzinfo=datetime.UTC + ), + last_updated_ts=None, + last_changed=datetime.datetime( + 2016, 10, 28, 20, 13, 52, 452529, tzinfo=datetime.UTC + ), + last_changed_ts=None, + context_id=None, + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ) + session.add_all((state1, state2)) + # There is a default of now() for last_updated_ts so make sure it's not set + session.query(old_db_schema.States).update( + {old_db_schema.States.last_updated_ts: None} + ) + state3 = old_db_schema.States( + entity_id="state.already_migrated", + last_updated=None, + last_updated_ts=1477685632.452529, + last_changed=None, + last_changed_ts=1477685632.452529, + context_id=uuid_hex, + context_id_bin=None, + context_user_id=None, + context_user_id_bin=None, + context_parent_id=None, + context_parent_id_bin=None, + ) + session.add_all((state3,)) + + with session_scope(hass=hass, read_only=True) as session: + states = session.query(old_db_schema.States).all() + assert len(states) == 3 + + # Create database with old schema + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object(migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await instance.async_add_executor_job(_insert_states) + + await async_wait_recording_done(hass) + now = dt_util.utcnow() + await _async_wait_migration_done(hass) + await async_wait_recording_done(hass) + + await hass.async_stop() + await hass.async_block_till_done() + + def _fetch_migrated_states(): + with session_scope(hass=hass) as session: + states = session.query(States).all() + assert len(states) == 3 + return {state.state_id: _object_as_dict(state) for state in states} + + # Run again with new schema, let migration run + async with async_test_home_assistant() as hass: + with ( + freeze_time(now), + instrument_migration(hass) as instrumented_migration, + ): + async with async_test_recorder( + hass, wait_recorder=False, wait_recorder_setup=False + ) as instance: + # Check the context ID migrator is considered non-live + assert recorder.util.async_migration_is_live(hass) is False + instrumented_migration.migration_stall.set() + instance.recorder_and_worker_thread_ids.add(threading.get_ident()) + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + states_by_metadata_id = await instance.async_add_executor_job( + _fetch_migrated_states + ) + + await hass.async_stop() + await hass.async_block_till_done() + + assert len(states_by_metadata_id) == 3 + for state in states_by_metadata_id.values(): + assert state["last_updated_ts"] is not None + + by_entity_id = { + state["entity_id"]: state for state in states_by_metadata_id.values() + } + assert by_entity_id["state.test_state1"]["last_updated_ts"] == 1477685632.452529 + assert by_entity_id["state.test_state2"]["last_updated_ts"] == 1477685632.552529 + assert ( + by_entity_id["state.already_migrated"]["last_updated_ts"] == 1477685632.452529 + )