Fix old indices never being removed with PostgreSQL (#89599)

This commit is contained in:
J. Nick Koston 2023-03-12 14:07:05 -10:00 committed by GitHub
parent e809b636e6
commit 459ea048ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 58 deletions

View File

@ -68,7 +68,7 @@ class Base(DeclarativeBase):
"""Base class for tables.""" """Base class for tables."""
SCHEMA_VERSION = 38 SCHEMA_VERSION = 39
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)

View File

@ -180,7 +180,9 @@ def migrate_schema(
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
session.add(SchemaChanges(schema_version=new_version)) session.add(SchemaChanges(schema_version=new_version))
_LOGGER.info("Upgrade to version %s done", new_version) # Log at the same level as the long schema changes
# so its clear that the upgrade is done
_LOGGER.warning("Upgrade to version %s done", new_version)
if schema_errors := schema_status.statistics_schema_errors: if schema_errors := schema_status.statistics_schema_errors:
_LOGGER.warning( _LOGGER.warning(
@ -215,11 +217,12 @@ def _create_index(
_LOGGER.debug("Creating %s index", index_name) _LOGGER.debug("Creating %s index", index_name)
_LOGGER.warning( _LOGGER.warning(
( (
"Adding index `%s` to database. Note: this can take several " "Adding index `%s` to table `%s`. Note: this can take several "
"minutes on large databases and slow computers. Please " "minutes on large databases and slow computers. Please "
"be patient!" "be patient!"
), ),
index_name, index_name,
table_name,
) )
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session:
try: try:
@ -250,51 +253,74 @@ def _drop_index(
string here is generated from the method parameters without sanitizing. string here is generated from the method parameters without sanitizing.
DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT. DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT.
""" """
_LOGGER.debug("Dropping index %s from table %s", index_name, table_name) _LOGGER.warning(
(
"Dropping index `%s` from table `%s`. Note: this can take several "
"minutes on large databases and slow computers. Please "
"be patient!"
),
index_name,
table_name,
)
success = False success = False
# Engines like DB2/Oracle # Engines like DB2/Oracle
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session, contextlib.suppress(
try: SQLAlchemyError
connection = session.connection() ):
connection.execute(text(f"DROP INDEX {index_name}")) connection = session.connection()
except SQLAlchemyError: connection.execute(text(f"DROP INDEX {index_name}"))
pass success = True
else:
success = True
# Engines like SQLite, SQL Server # Engines like SQLite, SQL Server
if not success: if not success:
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session, contextlib.suppress(
try: SQLAlchemyError
connection = session.connection() ):
connection.execute( connection = session.connection()
text( connection.execute(
"DROP INDEX {table}.{index}".format( text(
index=index_name, table=table_name "DROP INDEX {table}.{index}".format(
) index=index_name, table=table_name
) )
) )
except SQLAlchemyError: )
pass success = True
else:
success = True
if not success: if not success:
# Engines like MySQL, MS Access # Engines like MySQL, MS Access
with session_scope(session=session_maker()) as session: with session_scope(session=session_maker()) as session, contextlib.suppress(
try: SQLAlchemyError
connection = session.connection() ):
connection.execute( connection = session.connection()
text( connection.execute(
"DROP INDEX {index} ON {table}".format( text(
index=index_name, table=table_name "DROP INDEX {index} ON {table}".format(
) index=index_name, table=table_name
) )
) )
except SQLAlchemyError: )
pass success = True
else:
if not success:
# Engines like postgresql may have a prefix
# ex idx_16532_ix_events_event_type_time_fired
with session_scope(session=session_maker()) as session, contextlib.suppress(
SQLAlchemyError
):
connection = session.connection()
inspector = sqlalchemy.inspect(connection)
indexes = inspector.get_indexes(table_name)
if index_to_drop := next(
(
possible_index["name"]
for possible_index in indexes
if possible_index["name"]
and possible_index["name"].endswith(f"_{index_name}")
),
None,
):
connection.execute(text(f"DROP INDEX {index_to_drop}"))
success = True success = True
if success: if success:
@ -306,26 +332,9 @@ def _drop_index(
if quiet: if quiet:
return return
if index_name in (
"ix_states_entity_id",
"ix_states_context_parent_id",
"ix_statistics_short_term_statistic_id_start",
"ix_statistics_statistic_id_start",
):
# ix_states_context_parent_id was only there on nightly so we do not want
# to generate log noise or issues about it.
#
# ix_states_entity_id was only there for users who upgraded from schema
# version 8 or earlier. Newer installs will not have it so we do not
# want to generate log noise or issues about it.
#
# ix_statistics_short_term_statistic_id_start and ix_statistics_statistic_id_start
# were only there for users who upgraded from schema version 23 or earlier.
return
_LOGGER.warning( _LOGGER.warning(
( (
"Failed to drop index %s from table %s. Schema " "Failed to drop index `%s` from table `%s`. Schema "
"Migration will continue; this is not a " "Migration will continue; this is not a "
"critical operation" "critical operation"
), ),
@ -902,7 +911,8 @@ def _apply_update( # noqa: C901
# This index is no longer used and can cause MySQL to use the wrong index # This index is no longer used and can cause MySQL to use the wrong index
# when querying the states table. # when querying the states table.
# https://github.com/home-assistant/core/issues/83787 # https://github.com/home-assistant/core/issues/83787
_drop_index(session_maker, "states", "ix_states_entity_id") # There was an index cleanup here but its now done in schema 39
pass
elif new_version == 34: elif new_version == 34:
# Once we require SQLite >= 3.35.5, we should drop the columns: # Once we require SQLite >= 3.35.5, we should drop the columns:
# ALTER TABLE statistics DROP COLUMN created # ALTER TABLE statistics DROP COLUMN created
@ -964,11 +974,14 @@ def _apply_update( # noqa: C901
elif new_version == 35: elif new_version == 35:
# Migration is done in two steps to ensure we can start using # Migration is done in two steps to ensure we can start using
# the new columns before we wipe the old ones. # the new columns before we wipe the old ones.
_drop_index(session_maker, "statistics", "ix_statistics_statistic_id_start") _drop_index(
session_maker, "statistics", "ix_statistics_statistic_id_start", quiet=True
)
_drop_index( _drop_index(
session_maker, session_maker,
"statistics_short_term", "statistics_short_term",
"ix_statistics_short_term_statistic_id_start", "ix_statistics_short_term_statistic_id_start",
quiet=True,
) )
# ix_statistics_start and ix_statistics_statistic_id_start are still used # ix_statistics_start and ix_statistics_statistic_id_start are still used
# for the post migration cleanup and can be removed in a future version. # for the post migration cleanup and can be removed in a future version.
@ -994,6 +1007,40 @@ def _apply_update( # noqa: C901
_add_columns(session_maker, "states", [f"metadata_id {big_int}"]) _add_columns(session_maker, "states", [f"metadata_id {big_int}"])
_create_index(session_maker, "states", "ix_states_metadata_id") _create_index(session_maker, "states", "ix_states_metadata_id")
_create_index(session_maker, "states", "ix_states_metadata_id_last_updated_ts") _create_index(session_maker, "states", "ix_states_metadata_id_last_updated_ts")
elif new_version == 39:
# Dropping indexes with PostgreSQL never worked correctly if there was a prefix
# so we need to cleanup leftover indexes.
_drop_index(
session_maker, "events", "ix_events_event_type_time_fired_ts", quiet=True
)
_drop_index(session_maker, "events", "ix_events_event_type", quiet=True)
_drop_index(
session_maker, "events", "ix_events_event_type_time_fired", quiet=True
)
_drop_index(session_maker, "events", "ix_events_time_fired", quiet=True)
_drop_index(session_maker, "events", "ix_events_context_user_id", quiet=True)
_drop_index(session_maker, "events", "ix_events_context_parent_id", quiet=True)
_drop_index(
session_maker, "states", "ix_states_entity_id_last_updated", quiet=True
)
_drop_index(session_maker, "states", "ix_states_last_updated", quiet=True)
_drop_index(session_maker, "states", "ix_states_entity_id", quiet=True)
_drop_index(session_maker, "states", "ix_states_context_user_id", quiet=True)
_drop_index(session_maker, "states", "ix_states_context_parent_id", quiet=True)
_drop_index(session_maker, "states", "ix_states_created_domain", quiet=True)
_drop_index(session_maker, "states", "ix_states_entity_id_created", quiet=True)
_drop_index(session_maker, "states", "states__state_changes", quiet=True)
_drop_index(session_maker, "states", "states__significant_changes", quiet=True)
_drop_index(session_maker, "states", "ix_states_entity_id_created", quiet=True)
_drop_index(
session_maker, "statistics", "ix_statistics_statistic_id_start", quiet=True
)
_drop_index(
session_maker,
"statistics_short_term",
"ix_statistics_short_term_statistic_id_start",
quiet=True,
)
else: else:
raise ValueError(f"No schema migration defined for version {new_version}") raise ValueError(f"No schema migration defined for version {new_version}")
@ -1297,8 +1344,8 @@ def migrate_context_ids(instance: Recorder) -> bool:
is_done = not (events or states) is_done = not (events or states)
if is_done: if is_done:
_drop_index(session_maker, "events", "ix_events_context_id", quiet=True) _drop_index(session_maker, "events", "ix_events_context_id")
_drop_index(session_maker, "states", "ix_states_context_id", quiet=True) _drop_index(session_maker, "states", "ix_states_context_id")
_LOGGER.debug("Migrating context_ids to binary format: done=%s", is_done) _LOGGER.debug("Migrating context_ids to binary format: done=%s", is_done)
return is_done return is_done

View File

@ -7,7 +7,7 @@ import sys
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
from sqlalchemy import create_engine from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from homeassistant.components import recorder from homeassistant.components import recorder
@ -171,5 +171,17 @@ async def test_migrate_times(caplog: pytest.LogCaptureFixture, tmpdir) -> None:
assert states_result[0].last_changed_ts == one_second_past_timestamp assert states_result[0].last_changed_ts == one_second_past_timestamp
assert states_result[0].last_updated_ts == now_timestamp assert states_result[0].last_updated_ts == now_timestamp
def _get_events_index_names():
with session_scope(hass=hass) as session:
return inspect(session.connection()).get_indexes("events")
indexes = await recorder.get_instance(hass).async_add_executor_job(
_get_events_index_names
)
index_names = {index["name"] for index in indexes}
assert "ix_events_context_id_bin" in index_names
assert "ix_events_context_id" not in index_names
await hass.async_stop() await hass.async_stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ dt_util.DEFAULT_TIME_ZONE = ORIG_TZ