mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 17:27:52 +00:00
Ensure states table rebuild still happens if the event_id index was removed (#121938)
* Ensure states table rebuild still happens if the event_id index was removed If ix_states_event_id was removed by the foreign key still exists, the states table would not get rebuilt. This should not happen under normal circumstances and seems to only be possible if the index was removed manually or Home Assistant was restarted forcefully in the middle of a previous migration from years ago. * cover * fix tests * mysql wont allow at that point but thats ok as long as its gone at the end
This commit is contained in:
parent
1d62f0e380
commit
19d2d023ab
@ -16,7 +16,14 @@ import time
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import psutil_home_assistant as ha_psutil
|
||||
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update
|
||||
from sqlalchemy import (
|
||||
create_engine,
|
||||
event as sqlalchemy_event,
|
||||
exc,
|
||||
inspect,
|
||||
select,
|
||||
update,
|
||||
)
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.engine.interfaces import DBAPIConnection
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
@ -820,7 +827,7 @@ class Recorder(threading.Thread):
|
||||
# If ix_states_entity_id_last_updated_ts still exists
|
||||
# on the states table it means the entity id migration
|
||||
# finished by the EntityIDPostMigrationTask did not
|
||||
# because they restarted in the middle of it. We need
|
||||
# complete because they restarted in the middle of it. We need
|
||||
# to pick back up where we left off.
|
||||
if get_index_by_name(
|
||||
session,
|
||||
@ -832,9 +839,13 @@ class Recorder(threading.Thread):
|
||||
if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION:
|
||||
with contextlib.suppress(SQLAlchemyError):
|
||||
# If the index of event_ids on the states table is still present
|
||||
# we need to queue a task to remove it.
|
||||
if get_index_by_name(
|
||||
session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
|
||||
# or the event_id foreign key still exists we need to queue a
|
||||
# task to remove it.
|
||||
if (
|
||||
get_index_by_name(
|
||||
session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
|
||||
)
|
||||
or self._legacy_event_id_foreign_key_exists()
|
||||
):
|
||||
self.queue_task(EventIdMigrationTask())
|
||||
self.use_legacy_events_index = True
|
||||
@ -1285,6 +1296,21 @@ class Recorder(threading.Thread):
|
||||
"""Run post schema migration tasks."""
|
||||
migration.post_schema_migration(self, old_version, new_version)
|
||||
|
||||
def _legacy_event_id_foreign_key_exists(self) -> bool:
|
||||
"""Check if the legacy event_id foreign key exists."""
|
||||
engine = self.engine
|
||||
assert engine is not None
|
||||
return bool(
|
||||
next(
|
||||
(
|
||||
fk
|
||||
for fk in inspect(engine).get_foreign_keys(TABLE_STATES)
|
||||
if fk["constrained_columns"] == ["event_id"]
|
||||
),
|
||||
None,
|
||||
)
|
||||
)
|
||||
|
||||
def _migrate_states_context_ids(self) -> bool:
|
||||
"""Migrate states context ids if needed."""
|
||||
return migration.migrate_states_context_ids(self)
|
||||
|
@ -3,14 +3,14 @@
|
||||
from datetime import timedelta
|
||||
import importlib
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import DEFAULT, patch
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import create_engine, inspect
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from homeassistant.components import recorder
|
||||
from homeassistant.components.recorder import core, statistics
|
||||
from homeassistant.components.recorder import core, migration, statistics
|
||||
from homeassistant.components.recorder.queries import select_event_type_ids
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State
|
||||
@ -104,21 +104,14 @@ async def test_migrate_times(
|
||||
patch.object(core, "States", old_db_schema.States),
|
||||
patch.object(core, "Events", old_db_schema.Events),
|
||||
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_events_context_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_states_context_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_event_type_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_entity_ids",
|
||||
),
|
||||
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids"
|
||||
patch.multiple(
|
||||
"homeassistant.components.recorder.Recorder",
|
||||
_migrate_events_context_ids=DEFAULT,
|
||||
_migrate_states_context_ids=DEFAULT,
|
||||
_migrate_event_type_ids=DEFAULT,
|
||||
_migrate_entity_ids=DEFAULT,
|
||||
_post_migrate_entity_ids=DEFAULT,
|
||||
_cleanup_legacy_states_event_ids=DEFAULT,
|
||||
),
|
||||
):
|
||||
async with (
|
||||
@ -267,21 +260,14 @@ async def test_migrate_can_resume_entity_id_post_migration(
|
||||
patch.object(core, "States", old_db_schema.States),
|
||||
patch.object(core, "Events", old_db_schema.Events),
|
||||
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_events_context_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_states_context_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_event_type_ids",
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._migrate_entity_ids",
|
||||
),
|
||||
patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids"
|
||||
patch.multiple(
|
||||
"homeassistant.components.recorder.Recorder",
|
||||
_migrate_events_context_ids=DEFAULT,
|
||||
_migrate_states_context_ids=DEFAULT,
|
||||
_migrate_event_type_ids=DEFAULT,
|
||||
_migrate_entity_ids=DEFAULT,
|
||||
_post_migrate_entity_ids=DEFAULT,
|
||||
_cleanup_legacy_states_event_ids=DEFAULT,
|
||||
),
|
||||
):
|
||||
async with (
|
||||
@ -328,5 +314,143 @@ async def test_migrate_can_resume_entity_id_post_migration(
|
||||
states_indexes = await instance.async_add_executor_job(_get_states_index_names)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert "ix_states_entity_id_last_updated_ts" not in states_index_names
|
||||
assert "ix_states_event_id" not in states_index_names
|
||||
|
||||
await hass.async_stop()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("persistent_database", [True])
|
||||
@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage
|
||||
async def test_migrate_can_resume_ix_states_event_id_removed(
|
||||
async_test_recorder: RecorderInstanceGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
recorder_db_url: str,
|
||||
) -> None:
|
||||
"""Test we resume the entity id post migration after a restart.
|
||||
|
||||
This case tests the migration still happens if
|
||||
ix_states_event_id is removed from the states table.
|
||||
"""
|
||||
importlib.import_module(SCHEMA_MODULE)
|
||||
old_db_schema = sys.modules[SCHEMA_MODULE]
|
||||
now = dt_util.utcnow()
|
||||
one_second_past = now - timedelta(seconds=1)
|
||||
mock_state = State(
|
||||
"sensor.test",
|
||||
"old",
|
||||
{"last_reset": now.isoformat()},
|
||||
last_changed=one_second_past,
|
||||
last_updated=now,
|
||||
)
|
||||
state_changed_event = Event(
|
||||
EVENT_STATE_CHANGED,
|
||||
{
|
||||
"entity_id": "sensor.test",
|
||||
"old_state": None,
|
||||
"new_state": mock_state,
|
||||
},
|
||||
EventOrigin.local,
|
||||
time_fired_timestamp=now.timestamp(),
|
||||
)
|
||||
custom_event = Event(
|
||||
"custom_event",
|
||||
{"entity_id": "sensor.custom"},
|
||||
EventOrigin.local,
|
||||
time_fired_timestamp=now.timestamp(),
|
||||
)
|
||||
number_of_migrations = 5
|
||||
|
||||
def _get_event_id_foreign_keys():
|
||||
assert instance.engine is not None
|
||||
return next(
|
||||
(
|
||||
fk # type: ignore[misc]
|
||||
for fk in inspect(instance.engine).get_foreign_keys("states")
|
||||
if fk["constrained_columns"] == ["event_id"]
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def _get_states_index_names():
|
||||
with session_scope(hass=hass) as session:
|
||||
return inspect(session.connection()).get_indexes("states")
|
||||
|
||||
with (
|
||||
patch.object(recorder, "db_schema", old_db_schema),
|
||||
patch.object(
|
||||
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
|
||||
),
|
||||
patch.object(core, "StatesMeta", old_db_schema.StatesMeta),
|
||||
patch.object(core, "EventTypes", old_db_schema.EventTypes),
|
||||
patch.object(core, "EventData", old_db_schema.EventData),
|
||||
patch.object(core, "States", old_db_schema.States),
|
||||
patch.object(core, "Events", old_db_schema.Events),
|
||||
patch(CREATE_ENGINE_TARGET, new=_create_engine_test),
|
||||
patch.multiple(
|
||||
"homeassistant.components.recorder.Recorder",
|
||||
_migrate_events_context_ids=DEFAULT,
|
||||
_migrate_states_context_ids=DEFAULT,
|
||||
_migrate_event_type_ids=DEFAULT,
|
||||
_migrate_entity_ids=DEFAULT,
|
||||
_post_migrate_entity_ids=DEFAULT,
|
||||
_cleanup_legacy_states_event_ids=DEFAULT,
|
||||
),
|
||||
):
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
def _add_data():
|
||||
with session_scope(hass=hass) as session:
|
||||
session.add(old_db_schema.Events.from_event(custom_event))
|
||||
session.add(old_db_schema.States.from_event(state_changed_event))
|
||||
|
||||
await instance.async_add_executor_job(_add_data)
|
||||
await hass.async_block_till_done()
|
||||
await instance.async_block_till_done()
|
||||
|
||||
await instance.async_add_executor_job(
|
||||
migration._drop_index,
|
||||
instance.get_session,
|
||||
"states",
|
||||
"ix_states_event_id",
|
||||
)
|
||||
|
||||
states_indexes = await instance.async_add_executor_job(
|
||||
_get_states_index_names
|
||||
)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert instance.use_legacy_events_index is True
|
||||
assert (
|
||||
await instance.async_add_executor_job(_get_event_id_foreign_keys)
|
||||
is not None
|
||||
)
|
||||
|
||||
await hass.async_stop()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert "ix_states_entity_id_last_updated_ts" in states_index_names
|
||||
|
||||
async with (
|
||||
async_test_home_assistant() as hass,
|
||||
async_test_recorder(hass) as instance,
|
||||
):
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# We need to wait for all the migration tasks to complete
|
||||
# before we can check the database.
|
||||
for _ in range(number_of_migrations):
|
||||
await instance.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
states_indexes = await instance.async_add_executor_job(_get_states_index_names)
|
||||
states_index_names = {index["name"] for index in states_indexes}
|
||||
assert "ix_states_entity_id_last_updated_ts" not in states_index_names
|
||||
assert "ix_states_event_id" not in states_index_names
|
||||
assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None
|
||||
|
||||
await hass.async_stop()
|
||||
|
@ -1300,6 +1300,16 @@ def enable_migrate_entity_ids() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enable_migrate_event_ids() -> bool:
|
||||
"""Fixture to control enabling of recorder's event id migration.
|
||||
|
||||
To enable context id migration, tests can be marked with:
|
||||
@pytest.mark.parametrize("enable_migrate_event_ids", [True])
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def recorder_config() -> dict[str, Any] | None:
|
||||
"""Fixture to override recorder config.
|
||||
@ -1416,6 +1426,7 @@ async def async_test_recorder(
|
||||
enable_migrate_context_ids: bool,
|
||||
enable_migrate_event_type_ids: bool,
|
||||
enable_migrate_entity_ids: bool,
|
||||
enable_migrate_event_ids: bool,
|
||||
) -> AsyncGenerator[RecorderInstanceGenerator]:
|
||||
"""Yield context manager to setup recorder instance."""
|
||||
# pylint: disable-next=import-outside-toplevel
|
||||
@ -1457,6 +1468,11 @@ async def async_test_recorder(
|
||||
migrate_entity_ids = (
|
||||
recorder.Recorder._migrate_entity_ids if enable_migrate_entity_ids else None
|
||||
)
|
||||
legacy_event_id_foreign_key_exists = (
|
||||
recorder.Recorder._legacy_event_id_foreign_key_exists
|
||||
if enable_migrate_event_ids
|
||||
else None
|
||||
)
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder.async_nightly_tasks",
|
||||
@ -1493,6 +1509,11 @@ async def async_test_recorder(
|
||||
side_effect=migrate_entity_ids,
|
||||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._legacy_event_id_foreign_key_exists",
|
||||
side_effect=legacy_event_id_foreign_key_exists,
|
||||
autospec=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics",
|
||||
side_effect=compile_missing,
|
||||
|
Loading…
x
Reference in New Issue
Block a user