Rename constant in tests/components/recorder/test_migration_from_schema_32.py (#131819)

This commit is contained in:
Erik Montnemery 2024-11-28 13:26:58 +01:00 committed by GitHub
parent f41bc98fe2
commit d9832f8c3a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -55,7 +55,7 @@ from tests.common import async_test_home_assistant
from tests.typing import RecorderInstanceGenerator from tests.typing import RecorderInstanceGenerator
CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine" CREATE_ENGINE_TARGET = "homeassistant.components.recorder.core.create_engine"
SCHEMA_MODULE = "tests.components.recorder.db_schema_32" SCHEMA_MODULE_32 = "tests.components.recorder.db_schema_32"
@pytest.fixture @pytest.fixture
@ -81,8 +81,8 @@ def _create_engine_test(*args, **kwargs):
This simulates an existing db with the old schema. This simulates an existing db with the old schema.
""" """
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
engine = create_engine(*args, **kwargs) engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine) old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session: with Session(engine) as session:
@ -101,8 +101,8 @@ def _create_engine_test(*args, **kwargs):
@pytest.fixture @pytest.fixture
def db_schema_32(): def db_schema_32():
"""Fixture to initialize the db with the old schema.""" """Fixture to initialize the db with the old schema."""
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
with ( with (
patch.object(recorder, "db_schema", old_db_schema), patch.object(recorder, "db_schema", old_db_schema),
@ -127,8 +127,8 @@ async def test_migrate_events_context_ids(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
test_uuid = uuid.uuid4() test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex uuid_hex = test_uuid.hex
@ -386,8 +386,8 @@ async def test_finish_migrate_events_context_ids(
mark the migration as done before ensuring unused indices were dropped. This mark the migration as done before ensuring unused indices were dropped. This
test makes sure we drop the unused indices. test makes sure we drop the unused indices.
""" """
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_migration(): def _insert_migration():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -489,8 +489,8 @@ async def test_migrate_states_context_ids(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
) -> None: ) -> None:
"""Test we can migrate old uuid context ids and ulid context ids to binary format.""" """Test we can migrate old uuid context ids and ulid context ids to binary format."""
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
test_uuid = uuid.uuid4() test_uuid = uuid.uuid4()
uuid_hex = test_uuid.hex uuid_hex = test_uuid.hex
@ -730,8 +730,8 @@ async def test_finish_migrate_states_context_ids(
mark the migration as done before ensuring unused indices were dropped. This mark the migration as done before ensuring unused indices were dropped. This
test makes sure we drop the unused indices. test makes sure we drop the unused indices.
""" """
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_migration(): def _insert_migration():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -833,8 +833,8 @@ async def test_migrate_event_type_ids(
) -> None: ) -> None:
"""Test we can migrate event_types to the EventTypes table.""" """Test we can migrate event_types to the EventTypes table."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_events(): def _insert_events():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -924,8 +924,8 @@ async def test_migrate_event_type_ids(
async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) -> None: async def test_migrate_entity_ids(hass: HomeAssistant, recorder_mock: Recorder) -> None:
"""Test we can migrate entity_ids to the StatesMeta table.""" """Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_states(): def _insert_states():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -1004,8 +1004,8 @@ async def test_post_migrate_entity_ids(
) -> None: ) -> None:
"""Test we can migrate entity_ids to the StatesMeta table.""" """Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_events(): def _insert_events():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -1060,8 +1060,8 @@ async def test_migrate_null_entity_ids(
) -> None: ) -> None:
"""Test we can migrate entity_ids to the StatesMeta table.""" """Test we can migrate entity_ids to the StatesMeta table."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_states(): def _insert_states():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -1145,8 +1145,8 @@ async def test_migrate_null_event_type_ids(
) -> None: ) -> None:
"""Test we can migrate event_types to the EventTypes table when the event_type is NULL.""" """Test we can migrate event_types to the EventTypes table when the event_type is NULL."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
def _insert_events(): def _insert_events():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@ -1233,8 +1233,8 @@ async def test_stats_timestamp_conversion_is_reentrant(
"""Test stats migration is reentrant.""" """Test stats migration is reentrant."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await async_attach_db_engine(hass) await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
now = dt_util.utcnow() now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365) one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180) six_months_ago = now - datetime.timedelta(days=180)
@ -1386,8 +1386,8 @@ async def test_stats_timestamp_with_one_by_one(
"""Test stats migration with one by one.""" """Test stats migration with one by one."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await async_attach_db_engine(hass) await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
now = dt_util.utcnow() now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365) one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180) six_months_ago = now - datetime.timedelta(days=180)
@ -1606,8 +1606,8 @@ async def test_stats_timestamp_with_one_by_one_removes_duplicates(
"""Test stats migration with one by one removes duplicates.""" """Test stats migration with one by one removes duplicates."""
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
await async_attach_db_engine(hass) await async_attach_db_engine(hass)
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
now = dt_util.utcnow() now = dt_util.utcnow()
one_year_ago = now - datetime.timedelta(days=365) one_year_ago = now - datetime.timedelta(days=365)
six_months_ago = now - datetime.timedelta(days=180) six_months_ago = now - datetime.timedelta(days=180)
@ -1798,13 +1798,13 @@ async def test_stats_timestamp_with_one_by_one_removes_duplicates(
@pytest.mark.parametrize("persistent_database", [True]) @pytest.mark.parametrize("persistent_database", [True])
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage @pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
async def test_migrate_times( async def test_stats_migrate_times(
async_test_recorder: RecorderInstanceGenerator, async_test_recorder: RecorderInstanceGenerator,
caplog: pytest.LogCaptureFixture, caplog: pytest.LogCaptureFixture,
) -> None: ) -> None:
"""Test we can migrate times in the statistics tables.""" """Test we can migrate times in the statistics tables."""
importlib.import_module(SCHEMA_MODULE) importlib.import_module(SCHEMA_MODULE_32)
old_db_schema = sys.modules[SCHEMA_MODULE] old_db_schema = sys.modules[SCHEMA_MODULE_32]
now = dt_util.utcnow() now = dt_util.utcnow()
now_timestamp = now.timestamp() now_timestamp = now.timestamp()