Break out statistics repairs into a auto_repairs modules (#90068)

* Break out statistics schema repairs into a repairs module

A future PR will add repairs for events, states, etc

* reorg

* reorg

* reorg

* reorg

* fixes

* fix patch targets

* name space rename
This commit is contained in:
J. Nick Koston 2023-03-21 15:08:06 -10:00 committed by GitHub
parent 086bcfb2fc
commit ddcaa9d372
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1157 additions and 1045 deletions

View File

@ -0,0 +1 @@
"""Repairs for Recorder."""

View File

@ -0,0 +1 @@
"""Statistics repairs for Recorder."""

View File

@ -0,0 +1,261 @@
"""Statistics duplication repairs."""
from __future__ import annotations
import json
import logging
import os
from typing import TYPE_CHECKING
from sqlalchemy import func
from sqlalchemy.engine.row import Row
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal_column
from homeassistant.core import HomeAssistant
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.util import dt as dt_util
from ...const import SQLITE_MAX_BIND_VARS
from ...db_schema import Statistics, StatisticsBase, StatisticsMeta, StatisticsShortTerm
from ...util import database_job_retry_wrapper, execute
if TYPE_CHECKING:
from ... import Recorder
_LOGGER = logging.getLogger(__name__)
def _find_duplicates(
session: Session, table: type[StatisticsBase]
) -> tuple[list[int], list[dict]]:
"""Find duplicated statistics."""
subquery = (
session.query(
table.start,
table.metadata_id,
literal_column("1").label("is_duplicate"),
)
.group_by(table.metadata_id, table.start)
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
.having(func.count() > 1)
.subquery()
)
query = (
session.query(
table.id,
table.metadata_id,
table.created,
table.start,
table.mean,
table.min,
table.max,
table.last_reset,
table.state,
table.sum,
)
.outerjoin(
subquery,
(subquery.c.metadata_id == table.metadata_id)
& (subquery.c.start == table.start),
)
.filter(subquery.c.is_duplicate == 1)
.order_by(table.metadata_id, table.start, table.id.desc())
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
original_as_dict = {}
start = None
metadata_id = None
duplicate_ids: list[int] = []
non_identical_duplicates_as_dict: list[dict] = []
if not duplicates:
return (duplicate_ids, non_identical_duplicates_as_dict)
def columns_to_dict(duplicate: Row) -> dict:
"""Convert a SQLAlchemy row to dict."""
dict_ = {}
for key in (
"id",
"metadata_id",
"start",
"created",
"mean",
"min",
"max",
"last_reset",
"state",
"sum",
):
dict_[key] = getattr(duplicate, key)
return dict_
def compare_statistic_rows(row1: dict, row2: dict) -> bool:
"""Compare two statistics rows, ignoring id and created."""
ignore_keys = {"id", "created"}
keys1 = set(row1).difference(ignore_keys)
keys2 = set(row2).difference(ignore_keys)
return keys1 == keys2 and all(row1[k] == row2[k] for k in keys1)
for duplicate in duplicates:
if start != duplicate.start or metadata_id != duplicate.metadata_id:
original_as_dict = columns_to_dict(duplicate)
start = duplicate.start
metadata_id = duplicate.metadata_id
continue
duplicate_as_dict = columns_to_dict(duplicate)
duplicate_ids.append(duplicate.id)
if not compare_statistic_rows(original_as_dict, duplicate_as_dict):
non_identical_duplicates_as_dict.append(
{"duplicate": duplicate_as_dict, "original": original_as_dict}
)
return (duplicate_ids, non_identical_duplicates_as_dict)
def _delete_duplicates_from_table(
session: Session, table: type[StatisticsBase]
) -> tuple[int, list[dict]]:
"""Identify and delete duplicated statistics from a specified table."""
all_non_identical_duplicates: list[dict] = []
total_deleted_rows = 0
while True:
duplicate_ids, non_identical_duplicates = _find_duplicates(session, table)
if not duplicate_ids:
break
all_non_identical_duplicates.extend(non_identical_duplicates)
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(table)
.filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]))
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows
return (total_deleted_rows, all_non_identical_duplicates)
@database_job_retry_wrapper("delete statistics duplicates", 3)
def delete_statistics_duplicates(
instance: Recorder, hass: HomeAssistant, session: Session
) -> None:
"""Identify and delete duplicated statistics.
A backup will be made of duplicated statistics before it is deleted.
"""
deleted_statistics_rows, non_identical_duplicates = _delete_duplicates_from_table(
session, Statistics
)
if deleted_statistics_rows:
_LOGGER.info("Deleted %s duplicated statistics rows", deleted_statistics_rows)
if non_identical_duplicates:
isotime = dt_util.utcnow().isoformat()
backup_file_name = f"deleted_statistics.{isotime}.json"
backup_path = hass.config.path(STORAGE_DIR, backup_file_name)
os.makedirs(os.path.dirname(backup_path), exist_ok=True)
with open(backup_path, "w", encoding="utf8") as backup_file:
json.dump(
non_identical_duplicates,
backup_file,
indent=4,
sort_keys=True,
cls=JSONEncoder,
)
_LOGGER.warning(
(
"Deleted %s non identical duplicated %s rows, a backup of the deleted"
" rows has been saved to %s"
),
len(non_identical_duplicates),
Statistics.__tablename__,
backup_path,
)
deleted_short_term_statistics_rows, _ = _delete_duplicates_from_table(
session, StatisticsShortTerm
)
if deleted_short_term_statistics_rows:
_LOGGER.warning(
"Deleted duplicated short term statistic rows, please report at %s",
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
)
def _find_statistics_meta_duplicates(session: Session) -> list[int]:
"""Find duplicated statistics_meta."""
# When querying the database, be careful to only explicitly query for columns
# which were present in schema version 29. If querying the table, SQLAlchemy
# will refer to future columns.
subquery = (
session.query(
StatisticsMeta.statistic_id,
literal_column("1").label("is_duplicate"),
)
.group_by(StatisticsMeta.statistic_id)
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
.having(func.count() > 1)
.subquery()
)
query = (
session.query(StatisticsMeta.statistic_id, StatisticsMeta.id)
.outerjoin(
subquery,
(subquery.c.statistic_id == StatisticsMeta.statistic_id),
)
.filter(subquery.c.is_duplicate == 1)
.order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc())
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
statistic_id = None
duplicate_ids: list[int] = []
if not duplicates:
return duplicate_ids
for duplicate in duplicates:
if statistic_id != duplicate.statistic_id:
statistic_id = duplicate.statistic_id
continue
duplicate_ids.append(duplicate.id)
return duplicate_ids
def _delete_statistics_meta_duplicates(session: Session) -> int:
"""Identify and delete duplicated statistics from a specified table."""
total_deleted_rows = 0
while True:
duplicate_ids = _find_statistics_meta_duplicates(session)
if not duplicate_ids:
break
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(StatisticsMeta)
.filter(
StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])
)
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows
return total_deleted_rows
@database_job_retry_wrapper("delete statistics meta duplicates", 3)
def delete_statistics_meta_duplicates(instance: Recorder, session: Session) -> None:
"""Identify and delete duplicated statistics_meta.
This is used when migrating from schema version 28 to schema version 29.
"""
deleted_statistics_rows = _delete_statistics_meta_duplicates(session)
if deleted_statistics_rows:
statistics_meta_manager = instance.statistics_meta_manager
statistics_meta_manager.reset()
statistics_meta_manager.load(session)
_LOGGER.info(
"Deleted %s duplicated statistics_meta rows", deleted_statistics_rows
)

View File

@ -0,0 +1,295 @@
"""Statistics schema repairs."""
from __future__ import annotations
from collections.abc import Callable, Mapping
import contextlib
from datetime import datetime
import logging
from typing import TYPE_CHECKING
from sqlalchemy import text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from ...const import DOMAIN, SupportedDialect
from ...db_schema import Statistics, StatisticsShortTerm
from ...models import StatisticData, StatisticMetaData, datetime_to_timestamp_or_none
from ...statistics import (
_import_statistics_with_session,
_statistics_during_period_with_session,
)
from ...util import session_scope
if TYPE_CHECKING:
from ... import Recorder
_LOGGER = logging.getLogger(__name__)
def _validate_db_schema_utf8(
instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Lack of full utf8 support is only an issue for MySQL / MariaDB
if instance.dialect_name != SupportedDialect.MYSQL:
return schema_errors
# This name can't be represented unless 4-byte UTF-8 unicode is supported
utf8_name = "𓆚𓃗"
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": utf8_name,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics_meta_manager = instance.statistics_meta_manager
# Try inserting some metadata which needs utf8mb4 support
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
old_metadata_dict = statistics_meta_manager.get_many(
session, statistic_ids={statistic_id}
)
try:
statistics_meta_manager.update_or_add(
session, metadata, old_metadata_dict
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except OperationalError as err:
if err.orig and err.orig.args[0] == 1366:
_LOGGER.debug(
"Database table statistics_meta does not support 4-byte UTF-8"
)
schema_errors.add("statistics_meta.4-byte UTF-8")
session.rollback()
else:
raise
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def _get_future_year() -> int:
"""Get a year in the future."""
return datetime.now().year + 1
def _validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
statistics_meta_manager = instance.statistics_meta_manager
# Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL
if instance.dialect_name not in (
SupportedDialect.MYSQL,
SupportedDialect.POSTGRESQL,
):
return schema_errors
# This number can't be accurately represented as a 32-bit float
precise_number = 1.000000000000001
# This time can't be accurately represented unless datetimes have µs precision
#
# We want to insert statistics for a time in the future, in case they
# have conflicting metadata_id's with existing statistics that were
# never cleaned up. By inserting in the future, we can be sure that
# that by selecting the last inserted row, we will get the one we
# just inserted.
#
future_year = _get_future_year()
precise_time = datetime(future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
start_time = datetime(future_year, 10, 6, tzinfo=dt_util.UTC)
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": None,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics: StatisticData = {
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
def check_columns(
schema_errors: set[str],
stored: Mapping,
expected: Mapping,
columns: tuple[str, ...],
table_name: str,
supports: str,
) -> None:
for column in columns:
if stored[column] != expected[column]:
schema_errors.add(f"{table_name}.{supports}")
_LOGGER.error(
"Column %s in database table %s does not support %s (stored=%s != expected=%s)",
column,
table_name,
supports,
stored[column],
expected[column],
)
# Insert / adjust a test statistics row in each of the tables
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
for table in tables:
_import_statistics_with_session(
instance, session, metadata, (statistics,), table
)
stored_statistics = _statistics_during_period_with_session(
hass,
session,
start_time,
None,
{statistic_id},
"hour" if table == Statistics else "5minute",
None,
{"last_reset", "max", "mean", "min", "state", "sum"},
)
if not (stored_statistic := stored_statistics.get(statistic_id)):
_LOGGER.warning(
"Schema validation failed for table: %s", table.__tablename__
)
continue
# We want to look at the last inserted row to make sure there
# is not previous garbage data in the table that would cause
# the test to produce an incorrect result. To achieve this,
# we inserted a row in the future, and now we select the last
# inserted row back.
last_stored_statistic = stored_statistic[-1]
check_columns(
schema_errors,
last_stored_statistic,
statistics,
("max", "mean", "min", "state", "sum"),
table.__tablename__,
"double precision",
)
assert statistics["last_reset"]
check_columns(
schema_errors,
last_stored_statistic,
{
"last_reset": datetime_to_timestamp_or_none(
statistics["last_reset"]
),
"start": datetime_to_timestamp_or_none(statistics["start"]),
},
("start", "last_reset"),
table.__tablename__,
"µs precision",
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
schema_errors |= _validate_db_schema_utf8(instance, session_maker)
schema_errors |= _validate_db_schema(hass, instance, session_maker)
if schema_errors:
_LOGGER.debug(
"Detected statistics schema errors: %s", ", ".join(sorted(schema_errors))
)
return schema_errors
def correct_db_schema(
instance: Recorder,
engine: Engine,
session_maker: Callable[[], Session],
schema_errors: set[str],
) -> None:
"""Correct issues detected by validate_db_schema."""
from ...migration import _modify_columns # pylint: disable=import-outside-toplevel
if "statistics_meta.4-byte UTF-8" in schema_errors:
# Attempt to convert the table to utf8mb4
_LOGGER.warning(
(
"Updating character set and collation of table %s to utf8mb4. "
"Note: this can take several minutes on large databases and slow "
"computers. Please be patient!"
),
"statistics_meta",
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
# https://github.com/home-assistant/core/issues/56104
text(
"ALTER TABLE statistics_meta CONVERT TO CHARACTER SET utf8mb4"
" COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE"
)
)
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
for table in tables:
if f"{table.__tablename__}.double precision" in schema_errors:
# Attempt to convert float columns to double precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
],
)
if f"{table.__tablename__}.µs precision" in schema_errors:
# Attempt to convert timestamp columns to µs precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
)

View File

@ -28,6 +28,14 @@ from homeassistant.core import HomeAssistant
from homeassistant.util.enum import try_parse_enum
from homeassistant.util.ulid import ulid_to_bytes
from .auto_repairs.statistics.duplicates import (
delete_statistics_duplicates,
delete_statistics_meta_duplicates,
)
from .auto_repairs.statistics.schema import (
correct_db_schema as statistics_correct_db_schema,
validate_db_schema as statistics_validate_db_schema,
)
from .const import SupportedDialect
from .db_schema import (
CONTEXT_ID_BIN_MAX_LENGTH,
@ -55,13 +63,7 @@ from .queries import (
find_states_context_ids_to_migrate,
has_used_states_event_ids,
)
from .statistics import (
correct_db_schema as statistics_correct_db_schema,
delete_statistics_duplicates,
delete_statistics_meta_duplicates,
get_start_time,
validate_db_schema as statistics_validate_db_schema,
)
from .statistics import get_start_time
from .tasks import (
CommitTask,
PostSchemaMigrationTask,

View File

@ -2,34 +2,28 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Callable, Iterable, Mapping, Sequence
from collections.abc import Callable, Iterable, Sequence
import contextlib
import dataclasses
from datetime import datetime, timedelta
from functools import lru_cache, partial
from itertools import chain, groupby
import json
import logging
from operator import itemgetter
import os
import re
from statistics import mean
from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast
from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text
from sqlalchemy.engine import Engine
from sqlalchemy.engine.row import Row
from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError
from sqlalchemy.exc import SQLAlchemyError, StatementError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.lambdas import StatementLambdaElement
import voluptuous as vol
from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT
from homeassistant.core import HomeAssistant, callback, valid_entity_id
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.storage import STORAGE_DIR
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
from homeassistant.util import dt as dt_util
from homeassistant.util.unit_conversion import (
@ -53,14 +47,12 @@ from .const import (
DOMAIN,
EVENT_RECORDER_5MIN_STATISTICS_GENERATED,
EVENT_RECORDER_HOURLY_STATISTICS_GENERATED,
SQLITE_MAX_BIND_VARS,
SupportedDialect,
)
from .db_schema import (
STATISTICS_TABLES,
Statistics,
StatisticsBase,
StatisticsMeta,
StatisticsRuns,
StatisticsShortTerm,
)
@ -73,7 +65,6 @@ from .models import (
process_timestamp,
)
from .util import (
database_job_retry_wrapper,
execute,
execute_stmt_lambda_element,
get_instance,
@ -333,240 +324,6 @@ def get_start_time() -> datetime:
return last_period
def _find_duplicates(
session: Session, table: type[StatisticsBase]
) -> tuple[list[int], list[dict]]:
"""Find duplicated statistics."""
subquery = (
session.query(
table.start,
table.metadata_id,
literal_column("1").label("is_duplicate"),
)
.group_by(table.metadata_id, table.start)
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
.having(func.count() > 1)
.subquery()
)
query = (
session.query(
table.id,
table.metadata_id,
table.created,
table.start,
table.mean,
table.min,
table.max,
table.last_reset,
table.state,
table.sum,
)
.outerjoin(
subquery,
(subquery.c.metadata_id == table.metadata_id)
& (subquery.c.start == table.start),
)
.filter(subquery.c.is_duplicate == 1)
.order_by(table.metadata_id, table.start, table.id.desc())
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
original_as_dict = {}
start = None
metadata_id = None
duplicate_ids: list[int] = []
non_identical_duplicates_as_dict: list[dict] = []
if not duplicates:
return (duplicate_ids, non_identical_duplicates_as_dict)
def columns_to_dict(duplicate: Row) -> dict:
"""Convert a SQLAlchemy row to dict."""
dict_ = {}
for key in (
"id",
"metadata_id",
"start",
"created",
"mean",
"min",
"max",
"last_reset",
"state",
"sum",
):
dict_[key] = getattr(duplicate, key)
return dict_
def compare_statistic_rows(row1: dict, row2: dict) -> bool:
"""Compare two statistics rows, ignoring id and created."""
ignore_keys = {"id", "created"}
keys1 = set(row1).difference(ignore_keys)
keys2 = set(row2).difference(ignore_keys)
return keys1 == keys2 and all(row1[k] == row2[k] for k in keys1)
for duplicate in duplicates:
if start != duplicate.start or metadata_id != duplicate.metadata_id:
original_as_dict = columns_to_dict(duplicate)
start = duplicate.start
metadata_id = duplicate.metadata_id
continue
duplicate_as_dict = columns_to_dict(duplicate)
duplicate_ids.append(duplicate.id)
if not compare_statistic_rows(original_as_dict, duplicate_as_dict):
non_identical_duplicates_as_dict.append(
{"duplicate": duplicate_as_dict, "original": original_as_dict}
)
return (duplicate_ids, non_identical_duplicates_as_dict)
def _delete_duplicates_from_table(
session: Session, table: type[StatisticsBase]
) -> tuple[int, list[dict]]:
"""Identify and delete duplicated statistics from a specified table."""
all_non_identical_duplicates: list[dict] = []
total_deleted_rows = 0
while True:
duplicate_ids, non_identical_duplicates = _find_duplicates(session, table)
if not duplicate_ids:
break
all_non_identical_duplicates.extend(non_identical_duplicates)
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(table)
.filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]))
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows
return (total_deleted_rows, all_non_identical_duplicates)
@database_job_retry_wrapper("delete statistics duplicates", 3)
def delete_statistics_duplicates(
instance: Recorder, hass: HomeAssistant, session: Session
) -> None:
"""Identify and delete duplicated statistics.
A backup will be made of duplicated statistics before it is deleted.
"""
deleted_statistics_rows, non_identical_duplicates = _delete_duplicates_from_table(
session, Statistics
)
if deleted_statistics_rows:
_LOGGER.info("Deleted %s duplicated statistics rows", deleted_statistics_rows)
if non_identical_duplicates:
isotime = dt_util.utcnow().isoformat()
backup_file_name = f"deleted_statistics.{isotime}.json"
backup_path = hass.config.path(STORAGE_DIR, backup_file_name)
os.makedirs(os.path.dirname(backup_path), exist_ok=True)
with open(backup_path, "w", encoding="utf8") as backup_file:
json.dump(
non_identical_duplicates,
backup_file,
indent=4,
sort_keys=True,
cls=JSONEncoder,
)
_LOGGER.warning(
(
"Deleted %s non identical duplicated %s rows, a backup of the deleted"
" rows has been saved to %s"
),
len(non_identical_duplicates),
Statistics.__tablename__,
backup_path,
)
deleted_short_term_statistics_rows, _ = _delete_duplicates_from_table(
session, StatisticsShortTerm
)
if deleted_short_term_statistics_rows:
_LOGGER.warning(
"Deleted duplicated short term statistic rows, please report at %s",
"https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
)
def _find_statistics_meta_duplicates(session: Session) -> list[int]:
"""Find duplicated statistics_meta."""
# When querying the database, be careful to only explicitly query for columns
# which were present in schema version 29. If querying the table, SQLAlchemy
# will refer to future columns.
subquery = (
session.query(
StatisticsMeta.statistic_id,
literal_column("1").label("is_duplicate"),
)
.group_by(StatisticsMeta.statistic_id)
# https://github.com/sqlalchemy/sqlalchemy/issues/9189
# pylint: disable-next=not-callable
.having(func.count() > 1)
.subquery()
)
query = (
session.query(StatisticsMeta.statistic_id, StatisticsMeta.id)
.outerjoin(
subquery,
(subquery.c.statistic_id == StatisticsMeta.statistic_id),
)
.filter(subquery.c.is_duplicate == 1)
.order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc())
.limit(1000 * SQLITE_MAX_BIND_VARS)
)
duplicates = execute(query)
statistic_id = None
duplicate_ids: list[int] = []
if not duplicates:
return duplicate_ids
for duplicate in duplicates:
if statistic_id != duplicate.statistic_id:
statistic_id = duplicate.statistic_id
continue
duplicate_ids.append(duplicate.id)
return duplicate_ids
def _delete_statistics_meta_duplicates(session: Session) -> int:
"""Identify and delete duplicated statistics from a specified table."""
total_deleted_rows = 0
while True:
duplicate_ids = _find_statistics_meta_duplicates(session)
if not duplicate_ids:
break
for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
deleted_rows = (
session.query(StatisticsMeta)
.filter(
StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])
)
.delete(synchronize_session=False)
)
total_deleted_rows += deleted_rows
return total_deleted_rows
def delete_statistics_meta_duplicates(instance: Recorder, session: Session) -> None:
"""Identify and delete duplicated statistics_meta.
This is used when migrating from schema version 28 to schema version 29.
"""
deleted_statistics_rows = _delete_statistics_meta_duplicates(session)
if deleted_statistics_rows:
statistics_meta_manager = instance.statistics_meta_manager
statistics_meta_manager.reset()
statistics_meta_manager.load(session)
_LOGGER.info(
"Deleted %s duplicated statistics_meta rows", deleted_statistics_rows
)
def _compile_hourly_statistics_summary_mean_stmt(
start_time_ts: float, end_time_ts: float
) -> StatementLambdaElement:
@ -2478,271 +2235,6 @@ def async_change_statistics_unit(
)
def _validate_db_schema_utf8(
instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
# Lack of full utf8 support is only an issue for MySQL / MariaDB
if instance.dialect_name != SupportedDialect.MYSQL:
return schema_errors
# This name can't be represented unless 4-byte UTF-8 unicode is supported
utf8_name = "𓆚𓃗"
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": utf8_name,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics_meta_manager = instance.statistics_meta_manager
# Try inserting some metadata which needs utfmb4 support
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
old_metadata_dict = statistics_meta_manager.get_many(
session, statistic_ids={statistic_id}
)
try:
statistics_meta_manager.update_or_add(
session, metadata, old_metadata_dict
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except OperationalError as err:
if err.orig and err.orig.args[0] == 1366:
_LOGGER.debug(
"Database table statistics_meta does not support 4-byte UTF-8"
)
schema_errors.add("statistics_meta.4-byte UTF-8")
session.rollback()
else:
raise
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def _get_future_year() -> int:
"""Get a year in the future."""
return datetime.now().year + 1
def _validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
statistics_meta_manager = instance.statistics_meta_manager
# Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL
if instance.dialect_name not in (
SupportedDialect.MYSQL,
SupportedDialect.POSTGRESQL,
):
return schema_errors
# This number can't be accurately represented as a 32-bit float
precise_number = 1.000000000000001
# This time can't be accurately represented unless datetimes have µs precision
#
# We want to insert statistics for a time in the future, in case they
# have conflicting metadata_id's with existing statistics that were
# never cleaned up. By inserting in the future, we can be sure that
# that by selecting the last inserted row, we will get the one we
# just inserted.
#
future_year = _get_future_year()
precise_time = datetime(future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
start_time = datetime(future_year, 10, 6, tzinfo=dt_util.UTC)
statistic_id = f"{DOMAIN}.db_test"
metadata: StatisticMetaData = {
"has_mean": True,
"has_sum": True,
"name": None,
"source": DOMAIN,
"statistic_id": statistic_id,
"unit_of_measurement": None,
}
statistics: StatisticData = {
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
def check_columns(
schema_errors: set[str],
stored: Mapping,
expected: Mapping,
columns: tuple[str, ...],
table_name: str,
supports: str,
) -> None:
for column in columns:
if stored[column] != expected[column]:
schema_errors.add(f"{table_name}.{supports}")
_LOGGER.error(
"Column %s in database table %s does not support %s (stored=%s != expected=%s)",
column,
table_name,
supports,
stored[column],
expected[column],
)
# Insert / adjust a test statistics row in each of the tables
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
try:
# Mark the session as read_only to ensure that the test data is not committed
# to the database and we always rollback when the scope is exited
with session_scope(session=session_maker(), read_only=True) as session:
for table in tables:
_import_statistics_with_session(
instance, session, metadata, (statistics,), table
)
stored_statistics = _statistics_during_period_with_session(
hass,
session,
start_time,
None,
{statistic_id},
"hour" if table == Statistics else "5minute",
None,
{"last_reset", "max", "mean", "min", "state", "sum"},
)
if not (stored_statistic := stored_statistics.get(statistic_id)):
_LOGGER.warning(
"Schema validation failed for table: %s", table.__tablename__
)
continue
# We want to look at the last inserted row to make sure there
# is not previous garbage data in the table that would cause
# the test to produce an incorrect result. To achieve this,
# we inserted a row in the future, and now we select the last
# inserted row back.
last_stored_statistic = stored_statistic[-1]
check_columns(
schema_errors,
last_stored_statistic,
statistics,
("max", "mean", "min", "state", "sum"),
table.__tablename__,
"double precision",
)
assert statistics["last_reset"]
check_columns(
schema_errors,
last_stored_statistic,
{
"last_reset": datetime_to_timestamp_or_none(
statistics["last_reset"]
),
"start": datetime_to_timestamp_or_none(statistics["start"]),
},
("start", "last_reset"),
table.__tablename__,
"µs precision",
)
statistics_meta_manager.delete(session, statistic_ids=[statistic_id])
except Exception as exc: # pylint: disable=broad-except
_LOGGER.exception("Error when validating DB schema: %s", exc)
return schema_errors
def validate_db_schema(
hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
) -> set[str]:
"""Do some basic checks for common schema errors caused by manual migration."""
schema_errors: set[str] = set()
schema_errors |= _validate_db_schema_utf8(instance, session_maker)
schema_errors |= _validate_db_schema(hass, instance, session_maker)
if schema_errors:
_LOGGER.debug(
"Detected statistics schema errors: %s", ", ".join(sorted(schema_errors))
)
return schema_errors
def correct_db_schema(
instance: Recorder,
engine: Engine,
session_maker: Callable[[], Session],
schema_errors: set[str],
) -> None:
"""Correct issues detected by validate_db_schema."""
from .migration import _modify_columns # pylint: disable=import-outside-toplevel
if "statistics_meta.4-byte UTF-8" in schema_errors:
# Attempt to convert the table to utf8mb4
_LOGGER.warning(
(
"Updating character set and collation of table %s to utf8mb4. "
"Note: this can take several minutes on large databases and slow "
"computers. Please be patient!"
),
"statistics_meta",
)
with contextlib.suppress(SQLAlchemyError), session_scope(
session=session_maker()
) as session:
connection = session.connection()
connection.execute(
# Using LOCK=EXCLUSIVE to prevent the database from corrupting
# https://github.com/home-assistant/core/issues/56104
text(
"ALTER TABLE statistics_meta CONVERT TO CHARACTER SET utf8mb4"
" COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE"
)
)
tables: tuple[type[Statistics | StatisticsShortTerm], ...] = (
Statistics,
StatisticsShortTerm,
)
for table in tables:
if f"{table.__tablename__}.double precision" in schema_errors:
# Attempt to convert float columns to double precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
],
)
if f"{table.__tablename__}.µs precision" in schema_errors:
# Attempt to convert timestamp columns to µs precision
_modify_columns(
session_maker,
engine,
table.__tablename__,
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
)
def cleanup_statistics_timestamp_migration(instance: Recorder) -> bool:
"""Clean up the statistics migration from timestamp to datetime.

View File

@ -0,0 +1,5 @@
"""Tests for Recorder component."""
import pytest
pytest.register_assert_rewrite("tests.components.recorder.common")

View File

@ -0,0 +1,5 @@
"""Tests for Recorder component."""
import pytest
pytest.register_assert_rewrite("tests.components.recorder.common")

View File

@ -0,0 +1,330 @@
"""Test removing statistics duplicates."""
from collections.abc import Callable
# pylint: disable=invalid-name
import importlib
import sys
from unittest.mock import patch
import py
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from homeassistant.components import recorder
from homeassistant.components.recorder import statistics
from homeassistant.components.recorder.auto_repairs.statistics.duplicates import (
delete_statistics_duplicates,
delete_statistics_meta_duplicates,
)
from homeassistant.components.recorder.const import SQLITE_URL_PREFIX
from homeassistant.components.recorder.statistics import async_add_external_statistics
from homeassistant.components.recorder.util import session_scope
from homeassistant.core import HomeAssistant
from homeassistant.helpers import recorder as recorder_helper
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from ...common import wait_recording_done
from tests.common import get_test_home_assistant
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
def test_delete_duplicates_no_duplicates(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test removal of duplicated statistics."""
hass = hass_recorder()
wait_recording_done(hass)
instance = recorder.get_instance(hass)
with session_scope(hass=hass) as session:
delete_statistics_duplicates(instance, hass, session)
assert "duplicated statistics rows" not in caplog.text
assert "Found non identical" not in caplog.text
assert "Found duplicated" not in caplog.text
def test_duplicate_statistics_handle_integrity_error(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test the recorder does not blow up if statistics is duplicated."""
hass = hass_recorder()
wait_recording_done(hass)
period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00"))
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_statistics_1 = [
{
"start": period1,
"last_reset": None,
"state": 3,
"sum": 5,
},
]
external_energy_statistics_2 = [
{
"start": period2,
"last_reset": None,
"state": 3,
"sum": 6,
}
]
with patch.object(
statistics, "_statistics_exists", return_value=False
), patch.object(
statistics, "_insert_statistics", wraps=statistics._insert_statistics
) as insert_statistics_mock:
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_1
)
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_1
)
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_2
)
wait_recording_done(hass)
assert insert_statistics_mock.call_count == 3
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.Statistics).all()
assert len(tmp) == 2
assert "Blocked attempt to insert duplicated statistic rows" in caplog.text
def _create_engine_28(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
def test_delete_metadata_duplicates(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_metadata_2 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_co2_metadata = {
"has_mean": True,
"has_sum": False,
"name": "Fossil percentage",
"source": "test",
"statistic_id": "test:fossil_percentage",
"unit_of_measurement": "%",
}
# Create some duplicated statistics_meta with schema version 28
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 1
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 2
assert tmp[1].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[2].id == 3
assert tmp[2].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
# Test that the duplicates are removed during migration from schema 28
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
assert "Deleted 1 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 2
assert tmp[0].id == 2
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 3
assert tmp[1].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
def test_delete_metadata_duplicates_many(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_metadata_2 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_2",
"unit_of_measurement": "kWh",
}
external_co2_metadata = {
"has_mean": True,
"has_sum": False,
"name": "Fossil percentage",
"source": "test",
"statistic_id": "test:fossil_percentage",
"unit_of_measurement": "%",
}
# Create some duplicated statistics with schema version 28
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
for _ in range(1100):
session.add(
recorder.db_schema.StatisticsMeta.from_meta(
external_energy_metadata_1
)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
# Test that the duplicates are removed during migration from schema 28
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
assert "Deleted 1102 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 1101
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 1103
assert tmp[1].statistic_id == "test:total_energy_import_tariff_2"
assert tmp[2].id == 1105
assert tmp[2].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
def test_delete_metadata_duplicates_no_duplicates(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test removal of duplicated statistics."""
hass = hass_recorder()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
instance = recorder.get_instance(hass)
delete_statistics_meta_duplicates(instance, session)
assert "duplicated statistics_meta rows" not in caplog.text

View File

@ -0,0 +1,235 @@
"""The test repairing statistics schema."""
# pylint: disable=invalid-name
from datetime import datetime
from unittest.mock import ANY, DEFAULT, MagicMock, patch
import pytest
from sqlalchemy.exc import OperationalError
from homeassistant.components.recorder.auto_repairs.statistics.schema import (
_get_future_year,
)
from homeassistant.components.recorder.statistics import (
_statistics_during_period_with_session,
)
from homeassistant.components.recorder.table_managers.statistics_meta import (
StatisticsMetaManager,
)
from homeassistant.core import HomeAssistant
import homeassistant.util.dt as dt_util
from ...common import async_wait_recording_done
from tests.typing import RecorderInstanceGenerator
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
async def test_validate_db_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
) -> None:
"""Test validating DB schema with MySQL and PostgreSQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert "Detected statistics schema errors" not in caplog.text
assert "Database is about to correct DB schema errors" not in caplog.text
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
async def test_validate_db_schema_fix_utf8_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
utf8_error = OperationalError("", "", orig=orig_error)
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", "mysql"
), patch(
"homeassistant.components.recorder.table_managers.statistics_meta.StatisticsMetaManager.update_or_add",
wraps=StatisticsMetaManager.update_or_add,
side_effect=[utf8_error, DEFAULT, DEFAULT],
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
"Database is about to correct DB schema errors: statistics_meta.4-byte UTF-8"
in caplog.text
)
assert (
"Updating character set and collation of table statistics_meta to utf8mb4"
in caplog.text
)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(("max", 1.0), ("mean", 1.0), ("min", 1.0), ("state", 1.0), ("sum", 1.0)),
)
async def test_validate_db_schema_fix_float_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
table,
replace_index,
column,
value,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
fixed_future_year = _get_future_year()
precise_time = datetime(fixed_future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time.timestamp(),
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time.timestamp(),
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._get_future_year",
return_value=fixed_future_year,
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
f"Database is about to correct DB schema errors: {table}.double precision"
in caplog.text
)
modification = [
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
]
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize(
("db_engine", "modification"),
(
("mysql", ["last_reset_ts DOUBLE PRECISION", "start_ts DOUBLE PRECISION"]),
(
"postgresql",
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
),
),
)
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(
("last_reset", "2020-10-06T00:00:00+00:00"),
("start", "2020-10-06T00:00:00+00:00"),
),
)
async def test_validate_db_schema_fix_statistics_datetime_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
modification,
table,
replace_index,
column,
value,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
precise_time = datetime(2020, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
f"Database is about to correct DB schema errors: {table}.µs precision"
in caplog.text
)
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)

View File

@ -2,20 +2,14 @@
from collections.abc import Callable
# pylint: disable=invalid-name
from datetime import datetime, timedelta
import importlib
import sys
from unittest.mock import ANY, DEFAULT, MagicMock, patch
from datetime import timedelta
from unittest.mock import patch
import py
import pytest
from sqlalchemy import create_engine, select
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Session
from sqlalchemy import select
from homeassistant.components import recorder
from homeassistant.components.recorder import Recorder, history, statistics
from homeassistant.components.recorder.const import SQLITE_URL_PREFIX
from homeassistant.components.recorder.db_schema import StatisticsShortTerm
from homeassistant.components.recorder.models import (
datetime_to_timestamp_or_none,
@ -26,12 +20,8 @@ from homeassistant.components.recorder.statistics import (
_generate_max_mean_min_statistic_in_sub_period_stmt,
_generate_statistics_at_time_stmt,
_generate_statistics_during_period_stmt,
_get_future_year,
_statistics_during_period_with_session,
async_add_external_statistics,
async_import_statistics,
delete_statistics_duplicates,
delete_statistics_meta_duplicates,
get_last_short_term_statistics,
get_last_statistics,
get_latest_short_term_statistics,
@ -39,14 +29,12 @@ from homeassistant.components.recorder.statistics import (
list_statistic_ids,
)
from homeassistant.components.recorder.table_managers.statistics_meta import (
StatisticsMetaManager,
_generate_get_metadata_stmt,
)
from homeassistant.components.recorder.util import session_scope
from homeassistant.components.sensor import UNIT_CONVERTERS
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import recorder as recorder_helper
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
@ -59,8 +47,8 @@ from .common import (
wait_recording_done,
)
from tests.common import get_test_home_assistant, mock_registry
from tests.typing import RecorderInstanceGenerator, WebSocketGenerator
from tests.common import mock_registry
from tests.typing import WebSocketGenerator
ORIG_TZ = dt_util.DEFAULT_TIME_ZONE
@ -1254,515 +1242,6 @@ def test_monthly_statistics(
dt_util.set_default_time_zone(dt_util.get_time_zone("UTC"))
def test_delete_duplicates_no_duplicates(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test removal of duplicated statistics."""
hass = hass_recorder()
wait_recording_done(hass)
instance = recorder.get_instance(hass)
with session_scope(hass=hass) as session:
delete_statistics_duplicates(instance, hass, session)
assert "duplicated statistics rows" not in caplog.text
assert "Found non identical" not in caplog.text
assert "Found duplicated" not in caplog.text
def test_duplicate_statistics_handle_integrity_error(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test the recorder does not blow up if statistics is duplicated."""
hass = hass_recorder()
wait_recording_done(hass)
period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00"))
period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00"))
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_statistics_1 = [
{
"start": period1,
"last_reset": None,
"state": 3,
"sum": 5,
},
]
external_energy_statistics_2 = [
{
"start": period2,
"last_reset": None,
"state": 3,
"sum": 6,
}
]
with patch.object(
statistics, "_statistics_exists", return_value=False
), patch.object(
statistics, "_insert_statistics", wraps=statistics._insert_statistics
) as insert_statistics_mock:
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_1
)
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_1
)
async_add_external_statistics(
hass, external_energy_metadata_1, external_energy_statistics_2
)
wait_recording_done(hass)
assert insert_statistics_mock.call_count == 3
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.Statistics).all()
assert len(tmp) == 2
assert "Blocked attempt to insert duplicated statistic rows" in caplog.text
def _create_engine_28(*args, **kwargs):
"""Test version of create_engine that initializes with old schema.
This simulates an existing db with the old schema.
"""
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
engine = create_engine(*args, **kwargs)
old_db_schema.Base.metadata.create_all(engine)
with Session(engine) as session:
session.add(
recorder.db_schema.StatisticsRuns(start=statistics.get_start_time())
)
session.add(
recorder.db_schema.SchemaChanges(
schema_version=old_db_schema.SCHEMA_VERSION
)
)
session.commit()
return engine
def test_delete_metadata_duplicates(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_metadata_2 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_co2_metadata = {
"has_mean": True,
"has_sum": False,
"name": "Fossil percentage",
"source": "test",
"statistic_id": "test:fossil_percentage",
"unit_of_measurement": "%",
}
# Create some duplicated statistics_meta with schema version 28
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 1
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 2
assert tmp[1].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[2].id == 3
assert tmp[2].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
# Test that the duplicates are removed during migration from schema 28
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
assert "Deleted 1 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 2
assert tmp[0].id == 2
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 3
assert tmp[1].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
def test_delete_metadata_duplicates_many(
caplog: pytest.LogCaptureFixture, tmpdir: py.path.local
) -> None:
"""Test removal of duplicated statistics."""
test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db")
dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}"
module = "tests.components.recorder.db_schema_28"
importlib.import_module(module)
old_db_schema = sys.modules[module]
external_energy_metadata_1 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_1",
"unit_of_measurement": "kWh",
}
external_energy_metadata_2 = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "test",
"statistic_id": "test:total_energy_import_tariff_2",
"unit_of_measurement": "kWh",
}
external_co2_metadata = {
"has_mean": True,
"has_sum": False,
"name": "Fossil percentage",
"source": "test",
"statistic_id": "test:fossil_percentage",
"unit_of_measurement": "%",
}
# Create some duplicated statistics with schema version 28
with patch.object(recorder, "db_schema", old_db_schema), patch.object(
recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION
), patch(
"homeassistant.components.recorder.core.create_engine", new=_create_engine_28
):
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
wait_recording_done(hass)
wait_recording_done(hass)
with session_scope(hass=hass) as session:
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1)
)
for _ in range(1100):
session.add(
recorder.db_schema.StatisticsMeta.from_meta(
external_energy_metadata_1
)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
session.add(
recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata)
)
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
# Test that the duplicates are removed during migration from schema 28
hass = get_test_home_assistant()
recorder_helper.async_initialize_recorder(hass)
setup_component(hass, "recorder", {"recorder": {"db_url": dburl}})
hass.start()
wait_recording_done(hass)
wait_recording_done(hass)
assert "Deleted 1102 duplicated statistics_meta rows" in caplog.text
with session_scope(hass=hass) as session:
tmp = session.query(recorder.db_schema.StatisticsMeta).all()
assert len(tmp) == 3
assert tmp[0].id == 1101
assert tmp[0].statistic_id == "test:total_energy_import_tariff_1"
assert tmp[1].id == 1103
assert tmp[1].statistic_id == "test:total_energy_import_tariff_2"
assert tmp[2].id == 1105
assert tmp[2].statistic_id == "test:fossil_percentage"
hass.stop()
dt_util.DEFAULT_TIME_ZONE = ORIG_TZ
def test_delete_metadata_duplicates_no_duplicates(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None:
"""Test removal of duplicated statistics."""
hass = hass_recorder()
wait_recording_done(hass)
with session_scope(hass=hass) as session:
instance = recorder.get_instance(hass)
delete_statistics_meta_duplicates(instance, session)
assert "duplicated statistics_meta rows" not in caplog.text
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
async def test_validate_db_schema(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
) -> None:
"""Test validating DB schema with MySQL and PostgreSQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert "Detected statistics schema errors" not in caplog.text
assert "Database is about to correct DB schema errors" not in caplog.text
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
async def test_validate_db_schema_fix_utf8_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
utf8_error = OperationalError("", "", orig=orig_error)
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", "mysql"
), patch(
"homeassistant.components.recorder.table_managers.statistics_meta.StatisticsMetaManager.update_or_add",
wraps=StatisticsMetaManager.update_or_add,
side_effect=[utf8_error, DEFAULT, DEFAULT],
):
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
"Database is about to correct DB schema errors: statistics_meta.4-byte UTF-8"
in caplog.text
)
assert (
"Updating character set and collation of table statistics_meta to utf8mb4"
in caplog.text
)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize("db_engine", ("mysql", "postgresql"))
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(("max", 1.0), ("mean", 1.0), ("min", 1.0), ("state", 1.0), ("sum", 1.0)),
)
async def test_validate_db_schema_fix_float_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
table,
replace_index,
column,
value,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
fixed_future_year = _get_future_year()
precise_time = datetime(fixed_future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time.timestamp(),
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time.timestamp(),
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.statistics._get_future_year",
return_value=fixed_future_year,
), patch(
"homeassistant.components.recorder.statistics._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
f"Database is about to correct DB schema errors: {table}.double precision"
in caplog.text
)
modification = [
"mean DOUBLE PRECISION",
"min DOUBLE PRECISION",
"max DOUBLE PRECISION",
"state DOUBLE PRECISION",
"sum DOUBLE PRECISION",
]
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)
@pytest.mark.parametrize("enable_statistics_table_validation", [True])
@pytest.mark.parametrize(
("db_engine", "modification"),
(
("mysql", ["last_reset_ts DOUBLE PRECISION", "start_ts DOUBLE PRECISION"]),
(
"postgresql",
[
"last_reset_ts DOUBLE PRECISION",
"start_ts DOUBLE PRECISION",
],
),
),
)
@pytest.mark.parametrize(
("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1))
)
@pytest.mark.parametrize(
("column", "value"),
(
("last_reset", "2020-10-06T00:00:00+00:00"),
("start", "2020-10-06T00:00:00+00:00"),
),
)
async def test_validate_db_schema_fix_statistics_datetime_issue(
async_setup_recorder_instance: RecorderInstanceGenerator,
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
db_engine,
modification,
table,
replace_index,
column,
value,
) -> None:
"""Test validating DB schema with MySQL.
Note: The test uses SQLite, the purpose is only to exercise the code.
"""
orig_error = MagicMock()
orig_error.args = [1366]
precise_number = 1.000000000000001
precise_time = datetime(2020, 10, 6, microsecond=1, tzinfo=dt_util.UTC)
statistics = {
"recorder.db_test": [
{
"last_reset": precise_time,
"max": precise_number,
"mean": precise_number,
"min": precise_number,
"start": precise_time,
"state": precise_number,
"sum": precise_number,
}
]
}
statistics["recorder.db_test"][0][column] = value
fake_statistics = [DEFAULT, DEFAULT]
fake_statistics[replace_index] = statistics
with patch(
"homeassistant.components.recorder.core.Recorder.dialect_name", db_engine
), patch(
"homeassistant.components.recorder.statistics._statistics_during_period_with_session",
side_effect=fake_statistics,
wraps=_statistics_during_period_with_session,
), patch(
"homeassistant.components.recorder.migration._modify_columns"
) as modify_columns_mock:
await async_setup_recorder_instance(hass)
await async_wait_recording_done(hass)
assert "Schema validation failed" not in caplog.text
assert (
f"Database is about to correct DB schema errors: {table}.µs precision"
in caplog.text
)
modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification)
def test_cache_key_for_generate_statistics_during_period_stmt() -> None:
"""Test cache key for _generate_statistics_during_period_stmt."""
columns = select(StatisticsShortTerm.metadata_id, StatisticsShortTerm.start_ts)

View File

@ -1282,13 +1282,16 @@ def hass_recorder(
# pylint: disable-next=import-outside-toplevel
from homeassistant.components import recorder
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.recorder.auto_repairs.statistics import schema
original_tz = dt_util.DEFAULT_TIME_ZONE
hass = get_test_home_assistant()
nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None
stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None
stats_validate = (
recorder.statistics.validate_db_schema
schema.validate_db_schema
if enable_statistics_table_validation
else itertools.repeat(set())
)
@ -1397,13 +1400,16 @@ async def async_setup_recorder_instance(
# pylint: disable-next=import-outside-toplevel
from homeassistant.components import recorder
# pylint: disable-next=import-outside-toplevel
from homeassistant.components.recorder.auto_repairs.statistics import schema
# pylint: disable-next=import-outside-toplevel
from .components.recorder.common import async_recorder_block_till_done
nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None
stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None
stats_validate = (
recorder.statistics.validate_db_schema
schema.validate_db_schema
if enable_statistics_table_validation
else itertools.repeat(set())
)