Speed up purge time with newer MariaDB versions (#87409)

* Speed up purge time with newer MariaDB versions

* fix

* document

* document

* document

* rename

* self review

* Update homeassistant/components/recorder/util.py

* fixes
This commit is contained in:
J. Nick Koston 2023-02-05 16:58:34 -06:00 committed by GitHub
parent 49d7bbe55d
commit c11f3ffa17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 87 deletions

View File

@ -14,7 +14,6 @@ import time
from typing import Any, TypeVar, cast from typing import Any, TypeVar, cast
import async_timeout import async_timeout
from awesomeversion import AwesomeVersion
from lru import LRU # pylint: disable=no-name-in-module from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
@ -67,6 +66,7 @@ from .db_schema import (
) )
from .executor import DBInterruptibleThreadPoolExecutor from .executor import DBInterruptibleThreadPoolExecutor
from .models import ( from .models import (
DatabaseEngine,
StatisticData, StatisticData,
StatisticMetaData, StatisticMetaData,
UnsupportedDialect, UnsupportedDialect,
@ -173,7 +173,7 @@ class Recorder(threading.Thread):
self.db_url = uri self.db_url = uri
self.db_max_retries = db_max_retries self.db_max_retries = db_max_retries
self.db_retry_wait = db_retry_wait self.db_retry_wait = db_retry_wait
self.engine_version: AwesomeVersion | None = None self.database_engine: DatabaseEngine | None = None
# Database connection is ready, but non-live migration may be in progress # Database connection is ready, but non-live migration may be in progress
db_connected: asyncio.Future[bool] = hass.data[DOMAIN].db_connected db_connected: asyncio.Future[bool] = hass.data[DOMAIN].db_connected
self.async_db_connected: asyncio.Future[bool] = db_connected self.async_db_connected: asyncio.Future[bool] = db_connected
@ -1125,13 +1125,13 @@ class Recorder(threading.Thread):
) -> None: ) -> None:
"""Dbapi specific connection settings.""" """Dbapi specific connection settings."""
assert self.engine is not None assert self.engine is not None
if version := setup_connection_for_dialect( if database_engine := setup_connection_for_dialect(
self, self,
self.engine.dialect.name, self.engine.dialect.name,
dbapi_connection, dbapi_connection,
not self._completed_first_database_setup, not self._completed_first_database_setup,
): ):
self.engine_version = version self.database_engine = database_engine
self._completed_first_database_setup = True self._completed_first_database_setup = True
if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url: if self.db_url == SQLITE_URL_PREFIX or ":memory:" in self.db_url:

View File

@ -1,10 +1,12 @@
"""Models for Recorder.""" """Models for Recorder."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from typing import Any, Literal, TypedDict, overload from typing import Any, Literal, TypedDict, overload
from awesomeversion import AwesomeVersion
from sqlalchemy.engine.row import Row from sqlalchemy.engine.row import Row
from homeassistant.const import ( from homeassistant.const import (
@ -17,6 +19,8 @@ from homeassistant.core import Context, State
from homeassistant.helpers.json import json_loads from homeassistant.helpers.json import json_loads
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import SupportedDialect
# pylint: disable=invalid-name # pylint: disable=invalid-name
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -425,3 +429,27 @@ class StatisticPeriod(TypedDict, total=False):
calendar: CalendarStatisticPeriod calendar: CalendarStatisticPeriod
fixed_period: FixedStatisticPeriod fixed_period: FixedStatisticPeriod
rolling_window: RollingWindowStatisticPeriod rolling_window: RollingWindowStatisticPeriod
@dataclass
class DatabaseEngine:
"""Properties of the database engine."""
dialect: SupportedDialect
optimizer: DatabaseOptimizer
version: AwesomeVersion | None
@dataclass
class DatabaseOptimizer:
"""Properties of the database optimizer for the configured database engine."""
# Some MariaDB versions have a bug that causes a slow query when using
# a range in a select statement with an IN clause.
#
# https://jira.mariadb.org/browse/MDEV-25020
#
# Historically, we have applied this logic to PostgreSQL as well, but
# it may not be necessary. We should revisit this in the future
# when we have more data.
slow_range_in_select: bool

View File

@ -14,13 +14,14 @@ from sqlalchemy.sql.expression import distinct
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from .const import MAX_ROWS_TO_PURGE, SupportedDialect from .const import MAX_ROWS_TO_PURGE
from .db_schema import Events, StateAttributes, States from .db_schema import Events, StateAttributes, States
from .models import DatabaseEngine
from .queries import ( from .queries import (
attributes_ids_exist_in_states, attributes_ids_exist_in_states,
attributes_ids_exist_in_states_sqlite, attributes_ids_exist_in_states_with_fast_in_distinct,
data_ids_exist_in_events, data_ids_exist_in_events,
data_ids_exist_in_events_sqlite, data_ids_exist_in_events_with_fast_in_distinct,
delete_event_data_rows, delete_event_data_rows,
delete_event_rows, delete_event_rows,
delete_recorder_runs_rows, delete_recorder_runs_rows,
@ -83,8 +84,6 @@ def purge_old_data(
"Purging states and events before target %s", "Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"), purge_before.isoformat(sep=" ", timespec="seconds"),
) )
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
has_more_to_purge = False has_more_to_purge = False
@ -93,9 +92,7 @@ def purge_old_data(
"Purge running in legacy format as there are states with event_id" "Purge running in legacy format as there are states with event_id"
" remaining" " remaining"
) )
has_more_to_purge |= _purge_legacy_format( has_more_to_purge |= _purge_legacy_format(instance, session, purge_before)
instance, session, purge_before, using_sqlite
)
else: else:
_LOGGER.debug( _LOGGER.debug(
"Purge running in new format as there are NO states with event_id" "Purge running in new format as there are NO states with event_id"
@ -103,10 +100,10 @@ def purge_old_data(
) )
# Once we are done purging legacy rows, we use the new method # Once we are done purging legacy rows, we use the new method
has_more_to_purge |= _purge_states_and_attributes_ids( has_more_to_purge |= _purge_states_and_attributes_ids(
instance, session, states_batch_size, purge_before, using_sqlite instance, session, states_batch_size, purge_before
) )
has_more_to_purge |= _purge_events_and_data_ids( has_more_to_purge |= _purge_events_and_data_ids(
instance, session, events_batch_size, purge_before, using_sqlite instance, session, events_batch_size, purge_before
) )
statistics_runs = _select_statistics_runs_to_purge(session, purge_before) statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
@ -140,7 +137,7 @@ def _purging_legacy_format(session: Session) -> bool:
def _purge_legacy_format( def _purge_legacy_format(
instance: Recorder, session: Session, purge_before: datetime, using_sqlite: bool instance: Recorder, session: Session, purge_before: datetime
) -> bool: ) -> bool:
"""Purge rows that are still linked by the event_ids.""" """Purge rows that are still linked by the event_ids."""
( (
@ -153,10 +150,10 @@ def _purge_legacy_format(
) )
if state_ids: if state_ids:
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
_purge_unused_attributes_ids(instance, session, attributes_ids, using_sqlite) _purge_unused_attributes_ids(instance, session, attributes_ids)
if event_ids: if event_ids:
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
_purge_unused_data_ids(instance, session, data_ids, using_sqlite) _purge_unused_data_ids(instance, session, data_ids)
return bool(event_ids or state_ids or attributes_ids or data_ids) return bool(event_ids or state_ids or attributes_ids or data_ids)
@ -165,12 +162,13 @@ def _purge_states_and_attributes_ids(
session: Session, session: Session,
states_batch_size: int, states_batch_size: int,
purge_before: datetime, purge_before: datetime,
using_sqlite: bool,
) -> bool: ) -> bool:
"""Purge states and linked attributes id in a batch. """Purge states and linked attributes id in a batch.
Returns true if there are more states to purge. Returns true if there are more states to purge.
""" """
database_engine = instance.database_engine
assert database_engine is not None
has_remaining_state_ids_to_purge = True has_remaining_state_ids_to_purge = True
# There are more states relative to attributes_ids so # There are more states relative to attributes_ids so
# we purge enough state_ids to try to generate a full # we purge enough state_ids to try to generate a full
@ -187,7 +185,7 @@ def _purge_states_and_attributes_ids(
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
attributes_ids_batch = attributes_ids_batch | attributes_ids attributes_ids_batch = attributes_ids_batch | attributes_ids
_purge_unused_attributes_ids(instance, session, attributes_ids_batch, using_sqlite) _purge_unused_attributes_ids(instance, session, attributes_ids_batch)
_LOGGER.debug( _LOGGER.debug(
"After purging states and attributes_ids remaining=%s", "After purging states and attributes_ids remaining=%s",
has_remaining_state_ids_to_purge, has_remaining_state_ids_to_purge,
@ -200,7 +198,6 @@ def _purge_events_and_data_ids(
session: Session, session: Session,
events_batch_size: int, events_batch_size: int,
purge_before: datetime, purge_before: datetime,
using_sqlite: bool,
) -> bool: ) -> bool:
"""Purge states and linked attributes id in a batch. """Purge states and linked attributes id in a batch.
@ -220,7 +217,7 @@ def _purge_events_and_data_ids(
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
data_ids_batch = data_ids_batch | data_ids data_ids_batch = data_ids_batch | data_ids
_purge_unused_data_ids(instance, session, data_ids_batch, using_sqlite) _purge_unused_data_ids(instance, session, data_ids_batch)
_LOGGER.debug( _LOGGER.debug(
"After purging event and data_ids remaining=%s", "After purging event and data_ids remaining=%s",
has_remaining_event_ids_to_purge, has_remaining_event_ids_to_purge,
@ -267,13 +264,13 @@ def _select_event_data_ids_to_purge(
def _select_unused_attributes_ids( def _select_unused_attributes_ids(
session: Session, attributes_ids: set[int], using_sqlite: bool session: Session, attributes_ids: set[int], database_engine: DatabaseEngine
) -> set[int]: ) -> set[int]:
"""Return a set of attributes ids that are not used by any states in the db.""" """Return a set of attributes ids that are not used by any states in the db."""
if not attributes_ids: if not attributes_ids:
return set() return set()
if using_sqlite: if not database_engine.optimizer.slow_range_in_select:
# #
# SQLite has a superior query optimizer for the distinct query below as it uses # SQLite has a superior query optimizer for the distinct query below as it uses
# the covering index without having to examine the rows directly for both of the # the covering index without having to examine the rows directly for both of the
@ -290,7 +287,7 @@ def _select_unused_attributes_ids(
seen_ids = { seen_ids = {
state[0] state[0]
for state in session.execute( for state in session.execute(
attributes_ids_exist_in_states_sqlite(attributes_ids) attributes_ids_exist_in_states_with_fast_in_distinct(attributes_ids)
).all() ).all()
} }
else: else:
@ -340,16 +337,18 @@ def _purge_unused_attributes_ids(
instance: Recorder, instance: Recorder,
session: Session, session: Session,
attributes_ids_batch: set[int], attributes_ids_batch: set[int],
using_sqlite: bool,
) -> None: ) -> None:
"""Purge unused attributes ids."""
database_engine = instance.database_engine
assert database_engine is not None
if unused_attribute_ids_set := _select_unused_attributes_ids( if unused_attribute_ids_set := _select_unused_attributes_ids(
session, attributes_ids_batch, using_sqlite session, attributes_ids_batch, database_engine
): ):
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
def _select_unused_event_data_ids( def _select_unused_event_data_ids(
session: Session, data_ids: set[int], using_sqlite: bool session: Session, data_ids: set[int], database_engine: DatabaseEngine
) -> set[int]: ) -> set[int]:
"""Return a set of event data ids that are not used by any events in the db.""" """Return a set of event data ids that are not used by any events in the db."""
if not data_ids: if not data_ids:
@ -357,11 +356,11 @@ def _select_unused_event_data_ids(
# See _select_unused_attributes_ids for why this function # See _select_unused_attributes_ids for why this function
# branches for non-sqlite databases. # branches for non-sqlite databases.
if using_sqlite: if not database_engine.optimizer.slow_range_in_select:
seen_ids = { seen_ids = {
state[0] state[0]
for state in session.execute( for state in session.execute(
data_ids_exist_in_events_sqlite(data_ids) data_ids_exist_in_events_with_fast_in_distinct(data_ids)
).all() ).all()
} }
else: else:
@ -381,10 +380,12 @@ def _select_unused_event_data_ids(
def _purge_unused_data_ids( def _purge_unused_data_ids(
instance: Recorder, session: Session, data_ids_batch: set[int], using_sqlite: bool instance: Recorder, session: Session, data_ids_batch: set[int]
) -> None: ) -> None:
database_engine = instance.database_engine
assert database_engine is not None
if unused_data_ids_set := _select_unused_event_data_ids( if unused_data_ids_set := _select_unused_event_data_ids(
session, data_ids_batch, using_sqlite session, data_ids_batch, database_engine
): ):
_purge_batch_data_ids(instance, session, unused_data_ids_set) _purge_batch_data_ids(instance, session, unused_data_ids_set)
@ -582,7 +583,8 @@ def _purge_old_recorder_runs(
def _purge_filtered_data(instance: Recorder, session: Session) -> bool: def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database.""" """Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data") _LOGGER.debug("Cleanup filtered data")
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE database_engine = instance.database_engine
assert database_engine is not None
# Check if excluded entity_ids are in database # Check if excluded entity_ids are in database
excluded_entity_ids: list[str] = [ excluded_entity_ids: list[str] = [
@ -591,7 +593,7 @@ def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
if not instance.entity_filter(entity_id) if not instance.entity_filter(entity_id)
] ]
if len(excluded_entity_ids) > 0: if len(excluded_entity_ids) > 0:
_purge_filtered_states(instance, session, excluded_entity_ids, using_sqlite) _purge_filtered_states(instance, session, excluded_entity_ids, database_engine)
return False return False
# Check if excluded event_types are in database # Check if excluded event_types are in database
@ -611,7 +613,7 @@ def _purge_filtered_states(
instance: Recorder, instance: Recorder,
session: Session, session: Session,
excluded_entity_ids: list[str], excluded_entity_ids: list[str],
using_sqlite: bool, database_engine: DatabaseEngine,
) -> None: ) -> None:
"""Remove filtered states and linked events.""" """Remove filtered states and linked events."""
state_ids: list[int] state_ids: list[int]
@ -632,7 +634,7 @@ def _purge_filtered_states(
_purge_state_ids(instance, session, set(state_ids)) _purge_state_ids(instance, session, set(state_ids))
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
unused_attribute_ids_set = _select_unused_attributes_ids( unused_attribute_ids_set = _select_unused_attributes_ids(
session, {id_ for id_ in attributes_ids if id_ is not None}, using_sqlite session, {id_ for id_ in attributes_ids if id_ is not None}, database_engine
) )
_purge_batch_attributes_ids(instance, session, unused_attribute_ids_set) _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
@ -641,7 +643,8 @@ def _purge_filtered_events(
instance: Recorder, session: Session, excluded_event_types: list[str] instance: Recorder, session: Session, excluded_event_types: list[str]
) -> None: ) -> None:
"""Remove filtered events and linked states.""" """Remove filtered events and linked states."""
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE database_engine = instance.database_engine
assert database_engine is not None
event_ids, data_ids = zip( event_ids, data_ids = zip(
*( *(
session.query(Events.event_id, Events.data_id) session.query(Events.event_id, Events.data_id)
@ -660,7 +663,7 @@ def _purge_filtered_events(
_purge_state_ids(instance, session, state_ids) _purge_state_ids(instance, session, state_ids)
_purge_event_ids(session, event_ids) _purge_event_ids(session, event_ids)
if unused_data_ids_set := _select_unused_event_data_ids( if unused_data_ids_set := _select_unused_event_data_ids(
session, set(data_ids), using_sqlite session, set(data_ids), database_engine
): ):
_purge_batch_data_ids(instance, session, unused_data_ids_set) _purge_batch_data_ids(instance, session, unused_data_ids_set)
if EVENT_STATE_CHANGED in excluded_event_types: if EVENT_STATE_CHANGED in excluded_event_types:
@ -671,7 +674,8 @@ def _purge_filtered_events(
@retryable_database_job("purge") @retryable_database_job("purge")
def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool: def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool]) -> bool:
"""Purge states and events of specified entities.""" """Purge states and events of specified entities."""
using_sqlite = instance.dialect_name == SupportedDialect.SQLITE database_engine = instance.database_engine
assert database_engine is not None
with session_scope(session=instance.get_session()) as session: with session_scope(session=instance.get_session()) as session:
selected_entity_ids: list[str] = [ selected_entity_ids: list[str] = [
entity_id entity_id
@ -682,7 +686,9 @@ def purge_entity_data(instance: Recorder, entity_filter: Callable[[str], bool])
if len(selected_entity_ids) > 0: if len(selected_entity_ids) > 0:
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states # Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states
# or events record. # or events record.
_purge_filtered_states(instance, session, selected_entity_ids, using_sqlite) _purge_filtered_states(
instance, session, selected_entity_ids, database_engine
)
_LOGGER.debug("Purging entity data hasn't fully completed yet") _LOGGER.debug("Purging entity data hasn't fully completed yet")
return False return False

View File

@ -45,7 +45,7 @@ def _state_attrs_exist(attr: int | None) -> Select:
return select(func.min(States.attributes_id)).where(States.attributes_id == attr) return select(func.min(States.attributes_id)).where(States.attributes_id == attr)
def attributes_ids_exist_in_states_sqlite( def attributes_ids_exist_in_states_with_fast_in_distinct(
attributes_ids: Iterable[int], attributes_ids: Iterable[int],
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find attributes ids that exist in the states table.""" """Find attributes ids that exist in the states table."""
@ -268,7 +268,7 @@ def attributes_ids_exist_in_states(
) )
def data_ids_exist_in_events_sqlite( def data_ids_exist_in_events_with_fast_in_distinct(
data_ids: Iterable[int], data_ids: Iterable[int],
) -> StatementLambdaElement: ) -> StatementLambdaElement:
"""Find data ids that exist in the events table.""" """Find data ids that exist in the events table."""

View File

@ -49,8 +49,8 @@ def _async_get_db_engine_info(instance: Recorder) -> dict[str, Any]:
db_engine_info: dict[str, Any] = {} db_engine_info: dict[str, Any] = {}
if dialect_name := instance.dialect_name: if dialect_name := instance.dialect_name:
db_engine_info["database_engine"] = dialect_name.value db_engine_info["database_engine"] = dialect_name.value
if engine_version := instance.engine_version: if database_engine := instance.database_engine:
db_engine_info["database_version"] = str(engine_version) db_engine_info["database_version"] = str(database_engine.version)
return db_engine_info return db_engine_info

View File

@ -36,7 +36,13 @@ from .db_schema import (
TABLES_TO_CHECK, TABLES_TO_CHECK,
RecorderRuns, RecorderRuns,
) )
from .models import StatisticPeriod, UnsupportedDialect, process_timestamp from .models import (
DatabaseEngine,
DatabaseOptimizer,
StatisticPeriod,
UnsupportedDialect,
process_timestamp,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from . import Recorder from . import Recorder
@ -51,44 +57,33 @@ QUERY_RETRY_WAIT = 0.1
SQLITE3_POSTFIXES = ["", "-wal", "-shm"] SQLITE3_POSTFIXES = ["", "-wal", "-shm"]
DEFAULT_YIELD_STATES_ROWS = 32768 DEFAULT_YIELD_STATES_ROWS = 32768
# Our minimum versions for each database # Our minimum versions for each database
# #
# Older MariaDB suffers https://jira.mariadb.org/browse/MDEV-25020 # Older MariaDB suffers https://jira.mariadb.org/browse/MDEV-25020
# which is fixed in 10.5.17, 10.6.9, 10.7.5, 10.8.4 # which is fixed in 10.5.17, 10.6.9, 10.7.5, 10.8.4
# #
MIN_VERSION_MARIA_DB = AwesomeVersion( def _simple_version(version: str) -> AwesomeVersion:
"10.3.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER """Return a simple version."""
) return AwesomeVersion(version, ensure_strategy=AwesomeVersionStrategy.SIMPLEVER)
RECOMMENDED_MIN_VERSION_MARIA_DB = AwesomeVersion(
"10.5.17", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
) MIN_VERSION_MARIA_DB = _simple_version("10.3.0")
MARIA_DB_106 = AwesomeVersion( RECOMMENDED_MIN_VERSION_MARIA_DB = _simple_version("10.5.17")
"10.6.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER MARIADB_WITH_FIXED_IN_QUERIES_105 = _simple_version("10.5.17")
) MARIA_DB_106 = _simple_version("10.6.0")
RECOMMENDED_MIN_VERSION_MARIA_DB_106 = AwesomeVersion( MARIADB_WITH_FIXED_IN_QUERIES_106 = _simple_version("10.6.9")
"10.6.9", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER RECOMMENDED_MIN_VERSION_MARIA_DB_106 = _simple_version("10.6.9")
) MARIA_DB_107 = _simple_version("10.7.0")
MARIA_DB_107 = AwesomeVersion( RECOMMENDED_MIN_VERSION_MARIA_DB_107 = _simple_version("10.7.5")
"10.7.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER MARIADB_WITH_FIXED_IN_QUERIES_107 = _simple_version("10.7.5")
) MARIA_DB_108 = _simple_version("10.8.0")
RECOMMENDED_MIN_VERSION_MARIA_DB_107 = AwesomeVersion( RECOMMENDED_MIN_VERSION_MARIA_DB_108 = _simple_version("10.8.4")
"10.7.5", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER MARIADB_WITH_FIXED_IN_QUERIES_108 = _simple_version("10.8.4")
) MIN_VERSION_MYSQL = _simple_version("8.0.0")
MARIA_DB_108 = AwesomeVersion( MIN_VERSION_PGSQL = _simple_version("12.0")
"10.8.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER MIN_VERSION_SQLITE = _simple_version("3.31.0")
)
RECOMMENDED_MIN_VERSION_MARIA_DB_108 = AwesomeVersion(
"10.8.4", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
MIN_VERSION_MYSQL = AwesomeVersion(
"8.0.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
MIN_VERSION_PGSQL = AwesomeVersion(
"12.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
MIN_VERSION_SQLITE = AwesomeVersion(
"3.31.0", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
# This is the maximum time after the recorder ends the session # This is the maximum time after the recorder ends the session
# before we no longer consider startup to be a "restart" and we # before we no longer consider startup to be a "restart" and we
@ -469,10 +464,12 @@ def setup_connection_for_dialect(
dialect_name: str, dialect_name: str,
dbapi_connection: Any, dbapi_connection: Any,
first_connection: bool, first_connection: bool,
) -> AwesomeVersion | None: ) -> DatabaseEngine | None:
"""Execute statements needed for dialect connection.""" """Execute statements needed for dialect connection."""
version: AwesomeVersion | None = None version: AwesomeVersion | None = None
slow_range_in_select = True
if dialect_name == SupportedDialect.SQLITE: if dialect_name == SupportedDialect.SQLITE:
slow_range_in_select = False
if first_connection: if first_connection:
old_isolation = dbapi_connection.isolation_level old_isolation = dbapi_connection.isolation_level
dbapi_connection.isolation_level = None dbapi_connection.isolation_level = None
@ -538,7 +535,19 @@ def setup_connection_for_dialect(
version or version_string, "MySQL", MIN_VERSION_MYSQL version or version_string, "MySQL", MIN_VERSION_MYSQL
) )
slow_range_in_select = bool(
not version
or version < MARIADB_WITH_FIXED_IN_QUERIES_105
or MARIA_DB_106 <= version < MARIADB_WITH_FIXED_IN_QUERIES_106
or MARIA_DB_107 <= version < MARIADB_WITH_FIXED_IN_QUERIES_107
or MARIA_DB_108 <= version < MARIADB_WITH_FIXED_IN_QUERIES_108
)
elif dialect_name == SupportedDialect.POSTGRESQL: elif dialect_name == SupportedDialect.POSTGRESQL:
# Historically we have marked PostgreSQL as having slow range in select
# but this may not be true for all versions. We should investigate
# this further when we have more data and remove this if possible
# in the future so we can use the simpler purge SQL query for
# _select_unused_attributes_ids and _select_unused_events_ids
if first_connection: if first_connection:
# server_version_num was added in 2006 # server_version_num was added in 2006
result = query_on_connection(dbapi_connection, "SHOW server_version") result = query_on_connection(dbapi_connection, "SHOW server_version")
@ -552,7 +561,14 @@ def setup_connection_for_dialect(
else: else:
_fail_unsupported_dialect(dialect_name) _fail_unsupported_dialect(dialect_name)
return version if not first_connection:
return None
return DatabaseEngine(
dialect=SupportedDialect(dialect_name),
version=version,
optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select),
)
def end_incomplete_runs(session: Session, start_time: datetime) -> None: def end_incomplete_runs(session: Session, start_time: datetime) -> None:

View File

@ -230,7 +230,12 @@ def test_setup_connection_for_dialect_sqlite(sqlite_version):
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
util.setup_connection_for_dialect(instance_mock, "sqlite", dbapi_connection, True) assert (
util.setup_connection_for_dialect(
instance_mock, "sqlite", dbapi_connection, True
)
is not None
)
assert len(execute_args) == 5 assert len(execute_args) == 5
assert execute_args[0] == "PRAGMA journal_mode=WAL" assert execute_args[0] == "PRAGMA journal_mode=WAL"
@ -240,7 +245,12 @@ def test_setup_connection_for_dialect_sqlite(sqlite_version):
assert execute_args[4] == "PRAGMA foreign_keys=ON" assert execute_args[4] == "PRAGMA foreign_keys=ON"
execute_args = [] execute_args = []
util.setup_connection_for_dialect(instance_mock, "sqlite", dbapi_connection, False) assert (
util.setup_connection_for_dialect(
instance_mock, "sqlite", dbapi_connection, False
)
is None
)
assert len(execute_args) == 3 assert len(execute_args) == 3
assert execute_args[0] == "PRAGMA cache_size = -16384" assert execute_args[0] == "PRAGMA cache_size = -16384"
@ -275,7 +285,12 @@ def test_setup_connection_for_dialect_sqlite_zero_commit_interval(
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
util.setup_connection_for_dialect(instance_mock, "sqlite", dbapi_connection, True) assert (
util.setup_connection_for_dialect(
instance_mock, "sqlite", dbapi_connection, True
)
is not None
)
assert len(execute_args) == 5 assert len(execute_args) == 5
assert execute_args[0] == "PRAGMA journal_mode=WAL" assert execute_args[0] == "PRAGMA journal_mode=WAL"
@ -285,7 +300,12 @@ def test_setup_connection_for_dialect_sqlite_zero_commit_interval(
assert execute_args[4] == "PRAGMA foreign_keys=ON" assert execute_args[4] == "PRAGMA foreign_keys=ON"
execute_args = [] execute_args = []
util.setup_connection_for_dialect(instance_mock, "sqlite", dbapi_connection, False) assert (
util.setup_connection_for_dialect(
instance_mock, "sqlite", dbapi_connection, False
)
is None
)
assert len(execute_args) == 3 assert len(execute_args) == 3
assert execute_args[0] == "PRAGMA cache_size = -16384" assert execute_args[0] == "PRAGMA cache_size = -16384"
@ -443,11 +463,13 @@ def test_supported_pgsql(caplog, pgsql_version):
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
util.setup_connection_for_dialect( database_engine = util.setup_connection_for_dialect(
instance_mock, "postgresql", dbapi_connection, True instance_mock, "postgresql", dbapi_connection, True
) )
assert "minimum supported version" not in caplog.text assert "minimum supported version" not in caplog.text
assert database_engine is not None
assert database_engine.optimizer.slow_range_in_select is True
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -524,9 +546,13 @@ def test_supported_sqlite(caplog, sqlite_version):
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
util.setup_connection_for_dialect(instance_mock, "sqlite", dbapi_connection, True) database_engine = util.setup_connection_for_dialect(
instance_mock, "sqlite", dbapi_connection, True
)
assert "minimum supported version" not in caplog.text assert "minimum supported version" not in caplog.text
assert database_engine is not None
assert database_engine.optimizer.slow_range_in_select is False
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -598,7 +624,7 @@ async def test_issue_for_mariadb_with_MDEV_25020(
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
await hass.async_add_executor_job( database_engine = await hass.async_add_executor_job(
util.setup_connection_for_dialect, util.setup_connection_for_dialect,
instance_mock, instance_mock,
"mysql", "mysql",
@ -612,6 +638,9 @@ async def test_issue_for_mariadb_with_MDEV_25020(
assert issue is not None assert issue is not None
assert issue.translation_placeholders == {"min_version": min_version} assert issue.translation_placeholders == {"min_version": min_version}
assert database_engine is not None
assert database_engine.optimizer.slow_range_in_select is True
@pytest.mark.parametrize( @pytest.mark.parametrize(
"mysql_version", "mysql_version",
@ -648,7 +677,7 @@ async def test_no_issue_for_mariadb_with_MDEV_25020(hass, caplog, mysql_version)
dbapi_connection = MagicMock(cursor=_make_cursor_mock) dbapi_connection = MagicMock(cursor=_make_cursor_mock)
await hass.async_add_executor_job( database_engine = await hass.async_add_executor_job(
util.setup_connection_for_dialect, util.setup_connection_for_dialect,
instance_mock, instance_mock,
"mysql", "mysql",
@ -661,6 +690,9 @@ async def test_no_issue_for_mariadb_with_MDEV_25020(hass, caplog, mysql_version)
issue = registry.async_get_issue(DOMAIN, "maria_db_range_index_regression") issue = registry.async_get_issue(DOMAIN, "maria_db_range_index_regression")
assert issue is None assert issue is None
assert database_engine is not None
assert database_engine.optimizer.slow_range_in_select is False
def test_basic_sanity_check(hass_recorder, recorder_db_url): def test_basic_sanity_check(hass_recorder, recorder_db_url):
"""Test the basic sanity checks with a missing table.""" """Test the basic sanity checks with a missing table."""