Improve recorder data migrator tests (#133628)

This commit is contained in:
Erik Montnemery 2024-12-20 18:41:14 +01:00 committed by GitHub
parent 17f0c24895
commit a23b37114e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -13,6 +13,7 @@ import pytest
from sqlalchemy import create_engine, inspect from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy.schema import Index
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder import ( from homeassistant.components.recorder import (
@ -120,9 +121,11 @@ def db_schema_32():
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_event_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_event_context_ids", [True])
@pytest.mark.parametrize("indices_to_drop", [[], [("events", "ix_events_context_id")]])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_events_context_ids( async def test_migrate_events_context_ids(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE_32) importlib.import_module(SCHEMA_MODULE_32)
@ -237,6 +240,13 @@ async def test_migrate_events_context_ids(
] ]
await _async_wait_migration_done(hass) await _async_wait_migration_done(hass)
# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
@ -266,7 +276,13 @@ async def test_migrate_events_context_ids(
# Run again with new schema, let migration run # Run again with new schema, let migration run
async with async_test_home_assistant() as hass: async with async_test_home_assistant() as hass:
with freeze_time(now), instrument_migration(hass) as instrumented_migration: with (
freeze_time(now),
instrument_migration(hass) as instrumented_migration,
patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create,
):
async with async_test_recorder( async with async_test_recorder(
hass, wait_recorder=False, wait_recorder_setup=False hass, wait_recorder=False, wait_recorder_setup=False
) as instance: ) as instance:
@ -297,6 +313,10 @@ async def test_migrate_events_context_ids(
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]
old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"] old_uuid_context_id_event = events_by_type["old_uuid_context_id_event"]
assert old_uuid_context_id_event["context_id"] is None assert old_uuid_context_id_event["context_id"] is None
assert old_uuid_context_id_event["context_user_id"] is None assert old_uuid_context_id_event["context_user_id"] is None
@ -482,9 +502,11 @@ async def test_finish_migrate_events_context_ids(
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_state_context_ids", [True]) @pytest.mark.parametrize("enable_migrate_state_context_ids", [True])
@pytest.mark.parametrize("indices_to_drop", [[], [("states", "ix_states_context_id")]])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_states_context_ids( async def test_migrate_states_context_ids(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE_32) importlib.import_module(SCHEMA_MODULE_32)
@ -577,6 +599,13 @@ async def test_migrate_states_context_ids(
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await _async_wait_migration_done(hass) await _async_wait_migration_done(hass)
# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
@ -606,7 +635,12 @@ async def test_migrate_states_context_ids(
# Run again with new schema, let migration run # Run again with new schema, let migration run
async with async_test_home_assistant() as hass: async with async_test_home_assistant() as hass:
with instrument_migration(hass) as instrumented_migration: with (
instrument_migration(hass) as instrumented_migration,
patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create,
):
async with async_test_recorder( async with async_test_recorder(
hass, wait_recorder=False, wait_recorder_setup=False hass, wait_recorder=False, wait_recorder_setup=False
) as instance: ) as instance:
@ -637,6 +671,10 @@ async def test_migrate_states_context_ids(
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]
old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"] old_uuid_context_id = states_by_entity_id["state.old_uuid_context_id"]
assert old_uuid_context_id["context_id"] is None assert old_uuid_context_id["context_id"] is None
assert old_uuid_context_id["context_user_id"] is None assert old_uuid_context_id["context_user_id"] is None
@ -1049,9 +1087,13 @@ async def test_migrate_entity_ids(
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.parametrize("enable_migrate_entity_ids", [True]) @pytest.mark.parametrize("enable_migrate_entity_ids", [True])
@pytest.mark.parametrize(
"indices_to_drop", [[], [("states", "ix_states_entity_id_last_updated_ts")]]
)
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_post_migrate_entity_ids( async def test_post_migrate_entity_ids(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
indices_to_drop: list[tuple[str, str]],
) -> None: ) -> None:
"""Test we can migrate entity_ids to the StatesMeta table.""" """Test we can migrate entity_ids to the StatesMeta table."""
importlib.import_module(SCHEMA_MODULE_32) importlib.import_module(SCHEMA_MODULE_32)
@ -1096,6 +1138,13 @@ async def test_post_migrate_entity_ids(
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await _async_wait_migration_done(hass) await _async_wait_migration_done(hass)
# Remove index
instance.recorder_and_worker_thread_ids.add(threading.get_ident())
for table, index in indices_to_drop:
with session_scope(hass=hass) as session:
assert get_index_by_name(session, table, index) is not None
migration._drop_index(instance.get_session, table, index)
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
@ -1109,6 +1158,9 @@ async def test_post_migrate_entity_ids(
return {state.state: state.entity_id for state in states} return {state.state: state.entity_id for state in states}
# Run again with new schema, let migration run # Run again with new schema, let migration run
with patch(
"sqlalchemy.schema.Index.create", autospec=True, wraps=Index.create
) as wrapped_idx_create:
async with ( async with (
async_test_home_assistant() as hass, async_test_home_assistant() as hass,
async_test_recorder(hass) as instance, async_test_recorder(hass) as instance,
@ -1119,11 +1171,26 @@ async def test_post_migrate_entity_ids(
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
states_by_state = await instance.async_add_executor_job(_fetch_migrated_states) states_by_state = await instance.async_add_executor_job(
_fetch_migrated_states
)
# Check the index which will be removed by the migrator no longer exists
with session_scope(hass=hass) as session:
assert (
get_index_by_name(
session, "states", "ix_states_entity_id_last_updated_ts"
)
is None
)
await hass.async_stop() await hass.async_stop()
await hass.async_block_till_done() await hass.async_block_till_done()
# Check the index we removed was recreated
index_names = [call[1][0].name for call in wrapped_idx_create.mock_calls]
assert index_names == [index for _, index in indices_to_drop]
assert states_by_state["one_1"] is None assert states_by_state["one_1"] is None
assert states_by_state["two_2"] is None assert states_by_state["two_2"] is None
assert states_by_state["two_1"] is None assert states_by_state["two_1"] is None