diff --git a/homeassistant/components/recorder/auto_repairs/__init__.py b/homeassistant/components/recorder/auto_repairs/__init__.py new file mode 100644 index 00000000000..aa3880bf1d6 --- /dev/null +++ b/homeassistant/components/recorder/auto_repairs/__init__.py @@ -0,0 +1 @@ +"""Repairs for Recorder.""" diff --git a/homeassistant/components/recorder/auto_repairs/statistics/__init__.py b/homeassistant/components/recorder/auto_repairs/statistics/__init__.py new file mode 100644 index 00000000000..64bfd4fbb29 --- /dev/null +++ b/homeassistant/components/recorder/auto_repairs/statistics/__init__.py @@ -0,0 +1 @@ +"""Statistics repairs for Recorder.""" diff --git a/homeassistant/components/recorder/auto_repairs/statistics/duplicates.py b/homeassistant/components/recorder/auto_repairs/statistics/duplicates.py new file mode 100644 index 00000000000..8a24dcbf92b --- /dev/null +++ b/homeassistant/components/recorder/auto_repairs/statistics/duplicates.py @@ -0,0 +1,261 @@ +"""Statistics duplication repairs.""" +from __future__ import annotations + +import json +import logging +import os +from typing import TYPE_CHECKING + +from sqlalchemy import func +from sqlalchemy.engine.row import Row +from sqlalchemy.orm.session import Session +from sqlalchemy.sql.expression import literal_column + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.json import JSONEncoder +from homeassistant.helpers.storage import STORAGE_DIR +from homeassistant.util import dt as dt_util + +from ...const import SQLITE_MAX_BIND_VARS +from ...db_schema import Statistics, StatisticsBase, StatisticsMeta, StatisticsShortTerm +from ...util import database_job_retry_wrapper, execute + +if TYPE_CHECKING: + from ... import Recorder + +_LOGGER = logging.getLogger(__name__) + + +def _find_duplicates( + session: Session, table: type[StatisticsBase] +) -> tuple[list[int], list[dict]]: + """Find duplicated statistics.""" + subquery = ( + session.query( + table.start, + table.metadata_id, + literal_column("1").label("is_duplicate"), + ) + .group_by(table.metadata_id, table.start) + # https://github.com/sqlalchemy/sqlalchemy/issues/9189 + # pylint: disable-next=not-callable + .having(func.count() > 1) + .subquery() + ) + query = ( + session.query( + table.id, + table.metadata_id, + table.created, + table.start, + table.mean, + table.min, + table.max, + table.last_reset, + table.state, + table.sum, + ) + .outerjoin( + subquery, + (subquery.c.metadata_id == table.metadata_id) + & (subquery.c.start == table.start), + ) + .filter(subquery.c.is_duplicate == 1) + .order_by(table.metadata_id, table.start, table.id.desc()) + .limit(1000 * SQLITE_MAX_BIND_VARS) + ) + duplicates = execute(query) + original_as_dict = {} + start = None + metadata_id = None + duplicate_ids: list[int] = [] + non_identical_duplicates_as_dict: list[dict] = [] + + if not duplicates: + return (duplicate_ids, non_identical_duplicates_as_dict) + + def columns_to_dict(duplicate: Row) -> dict: + """Convert a SQLAlchemy row to dict.""" + dict_ = {} + for key in ( + "id", + "metadata_id", + "start", + "created", + "mean", + "min", + "max", + "last_reset", + "state", + "sum", + ): + dict_[key] = getattr(duplicate, key) + return dict_ + + def compare_statistic_rows(row1: dict, row2: dict) -> bool: + """Compare two statistics rows, ignoring id and created.""" + ignore_keys = {"id", "created"} + keys1 = set(row1).difference(ignore_keys) + keys2 = set(row2).difference(ignore_keys) + return keys1 == keys2 and all(row1[k] == row2[k] for k in keys1) + + for duplicate in duplicates: + if start != duplicate.start or metadata_id != duplicate.metadata_id: + original_as_dict = columns_to_dict(duplicate) + start = duplicate.start + metadata_id = duplicate.metadata_id + continue + duplicate_as_dict = columns_to_dict(duplicate) + duplicate_ids.append(duplicate.id) + if not compare_statistic_rows(original_as_dict, duplicate_as_dict): + non_identical_duplicates_as_dict.append( + {"duplicate": duplicate_as_dict, "original": original_as_dict} + ) + + return (duplicate_ids, non_identical_duplicates_as_dict) + + +def _delete_duplicates_from_table( + session: Session, table: type[StatisticsBase] +) -> tuple[int, list[dict]]: + """Identify and delete duplicated statistics from a specified table.""" + all_non_identical_duplicates: list[dict] = [] + total_deleted_rows = 0 + while True: + duplicate_ids, non_identical_duplicates = _find_duplicates(session, table) + if not duplicate_ids: + break + all_non_identical_duplicates.extend(non_identical_duplicates) + for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): + deleted_rows = ( + session.query(table) + .filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])) + .delete(synchronize_session=False) + ) + total_deleted_rows += deleted_rows + return (total_deleted_rows, all_non_identical_duplicates) + + +@database_job_retry_wrapper("delete statistics duplicates", 3) +def delete_statistics_duplicates( + instance: Recorder, hass: HomeAssistant, session: Session +) -> None: + """Identify and delete duplicated statistics. + + A backup will be made of duplicated statistics before it is deleted. + """ + deleted_statistics_rows, non_identical_duplicates = _delete_duplicates_from_table( + session, Statistics + ) + if deleted_statistics_rows: + _LOGGER.info("Deleted %s duplicated statistics rows", deleted_statistics_rows) + + if non_identical_duplicates: + isotime = dt_util.utcnow().isoformat() + backup_file_name = f"deleted_statistics.{isotime}.json" + backup_path = hass.config.path(STORAGE_DIR, backup_file_name) + + os.makedirs(os.path.dirname(backup_path), exist_ok=True) + with open(backup_path, "w", encoding="utf8") as backup_file: + json.dump( + non_identical_duplicates, + backup_file, + indent=4, + sort_keys=True, + cls=JSONEncoder, + ) + _LOGGER.warning( + ( + "Deleted %s non identical duplicated %s rows, a backup of the deleted" + " rows has been saved to %s" + ), + len(non_identical_duplicates), + Statistics.__tablename__, + backup_path, + ) + + deleted_short_term_statistics_rows, _ = _delete_duplicates_from_table( + session, StatisticsShortTerm + ) + if deleted_short_term_statistics_rows: + _LOGGER.warning( + "Deleted duplicated short term statistic rows, please report at %s", + "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22", + ) + + +def _find_statistics_meta_duplicates(session: Session) -> list[int]: + """Find duplicated statistics_meta.""" + # When querying the database, be careful to only explicitly query for columns + # which were present in schema version 29. If querying the table, SQLAlchemy + # will refer to future columns. + subquery = ( + session.query( + StatisticsMeta.statistic_id, + literal_column("1").label("is_duplicate"), + ) + .group_by(StatisticsMeta.statistic_id) + # https://github.com/sqlalchemy/sqlalchemy/issues/9189 + # pylint: disable-next=not-callable + .having(func.count() > 1) + .subquery() + ) + query = ( + session.query(StatisticsMeta.statistic_id, StatisticsMeta.id) + .outerjoin( + subquery, + (subquery.c.statistic_id == StatisticsMeta.statistic_id), + ) + .filter(subquery.c.is_duplicate == 1) + .order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc()) + .limit(1000 * SQLITE_MAX_BIND_VARS) + ) + duplicates = execute(query) + statistic_id = None + duplicate_ids: list[int] = [] + + if not duplicates: + return duplicate_ids + + for duplicate in duplicates: + if statistic_id != duplicate.statistic_id: + statistic_id = duplicate.statistic_id + continue + duplicate_ids.append(duplicate.id) + + return duplicate_ids + + +def _delete_statistics_meta_duplicates(session: Session) -> int: + """Identify and delete duplicated statistics from a specified table.""" + total_deleted_rows = 0 + while True: + duplicate_ids = _find_statistics_meta_duplicates(session) + if not duplicate_ids: + break + for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): + deleted_rows = ( + session.query(StatisticsMeta) + .filter( + StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]) + ) + .delete(synchronize_session=False) + ) + total_deleted_rows += deleted_rows + return total_deleted_rows + + +@database_job_retry_wrapper("delete statistics meta duplicates", 3) +def delete_statistics_meta_duplicates(instance: Recorder, session: Session) -> None: + """Identify and delete duplicated statistics_meta. + + This is used when migrating from schema version 28 to schema version 29. + """ + deleted_statistics_rows = _delete_statistics_meta_duplicates(session) + if deleted_statistics_rows: + statistics_meta_manager = instance.statistics_meta_manager + statistics_meta_manager.reset() + statistics_meta_manager.load(session) + _LOGGER.info( + "Deleted %s duplicated statistics_meta rows", deleted_statistics_rows + ) diff --git a/homeassistant/components/recorder/auto_repairs/statistics/schema.py b/homeassistant/components/recorder/auto_repairs/statistics/schema.py new file mode 100644 index 00000000000..bbf59080ac1 --- /dev/null +++ b/homeassistant/components/recorder/auto_repairs/statistics/schema.py @@ -0,0 +1,295 @@ +"""Statistics schema repairs.""" +from __future__ import annotations + +from collections.abc import Callable, Mapping +import contextlib +from datetime import datetime +import logging +from typing import TYPE_CHECKING + +from sqlalchemy import text +from sqlalchemy.engine import Engine +from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.orm.session import Session + +from homeassistant.core import HomeAssistant +from homeassistant.util import dt as dt_util + +from ...const import DOMAIN, SupportedDialect +from ...db_schema import Statistics, StatisticsShortTerm +from ...models import StatisticData, StatisticMetaData, datetime_to_timestamp_or_none +from ...statistics import ( + _import_statistics_with_session, + _statistics_during_period_with_session, +) +from ...util import session_scope + +if TYPE_CHECKING: + from ... import Recorder + +_LOGGER = logging.getLogger(__name__) + + +def _validate_db_schema_utf8( + instance: Recorder, session_maker: Callable[[], Session] +) -> set[str]: + """Do some basic checks for common schema errors caused by manual migration.""" + schema_errors: set[str] = set() + + # Lack of full utf8 support is only an issue for MySQL / MariaDB + if instance.dialect_name != SupportedDialect.MYSQL: + return schema_errors + + # This name can't be represented unless 4-byte UTF-8 unicode is supported + utf8_name = "𓆚𓃗" + statistic_id = f"{DOMAIN}.db_test" + + metadata: StatisticMetaData = { + "has_mean": True, + "has_sum": True, + "name": utf8_name, + "source": DOMAIN, + "statistic_id": statistic_id, + "unit_of_measurement": None, + } + statistics_meta_manager = instance.statistics_meta_manager + + # Try inserting some metadata which needs utf8mb4 support + try: + # Mark the session as read_only to ensure that the test data is not committed + # to the database and we always rollback when the scope is exited + with session_scope(session=session_maker(), read_only=True) as session: + old_metadata_dict = statistics_meta_manager.get_many( + session, statistic_ids={statistic_id} + ) + try: + statistics_meta_manager.update_or_add( + session, metadata, old_metadata_dict + ) + statistics_meta_manager.delete(session, statistic_ids=[statistic_id]) + except OperationalError as err: + if err.orig and err.orig.args[0] == 1366: + _LOGGER.debug( + "Database table statistics_meta does not support 4-byte UTF-8" + ) + schema_errors.add("statistics_meta.4-byte UTF-8") + session.rollback() + else: + raise + except Exception as exc: # pylint: disable=broad-except + _LOGGER.exception("Error when validating DB schema: %s", exc) + return schema_errors + + +def _get_future_year() -> int: + """Get a year in the future.""" + return datetime.now().year + 1 + + +def _validate_db_schema( + hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session] +) -> set[str]: + """Do some basic checks for common schema errors caused by manual migration.""" + schema_errors: set[str] = set() + statistics_meta_manager = instance.statistics_meta_manager + + # Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL + if instance.dialect_name not in ( + SupportedDialect.MYSQL, + SupportedDialect.POSTGRESQL, + ): + return schema_errors + + # This number can't be accurately represented as a 32-bit float + precise_number = 1.000000000000001 + # This time can't be accurately represented unless datetimes have µs precision + # + # We want to insert statistics for a time in the future, in case they + # have conflicting metadata_id's with existing statistics that were + # never cleaned up. By inserting in the future, we can be sure that + # that by selecting the last inserted row, we will get the one we + # just inserted. + # + future_year = _get_future_year() + precise_time = datetime(future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC) + start_time = datetime(future_year, 10, 6, tzinfo=dt_util.UTC) + statistic_id = f"{DOMAIN}.db_test" + + metadata: StatisticMetaData = { + "has_mean": True, + "has_sum": True, + "name": None, + "source": DOMAIN, + "statistic_id": statistic_id, + "unit_of_measurement": None, + } + statistics: StatisticData = { + "last_reset": precise_time, + "max": precise_number, + "mean": precise_number, + "min": precise_number, + "start": precise_time, + "state": precise_number, + "sum": precise_number, + } + + def check_columns( + schema_errors: set[str], + stored: Mapping, + expected: Mapping, + columns: tuple[str, ...], + table_name: str, + supports: str, + ) -> None: + for column in columns: + if stored[column] != expected[column]: + schema_errors.add(f"{table_name}.{supports}") + _LOGGER.error( + "Column %s in database table %s does not support %s (stored=%s != expected=%s)", + column, + table_name, + supports, + stored[column], + expected[column], + ) + + # Insert / adjust a test statistics row in each of the tables + tables: tuple[type[Statistics | StatisticsShortTerm], ...] = ( + Statistics, + StatisticsShortTerm, + ) + try: + # Mark the session as read_only to ensure that the test data is not committed + # to the database and we always rollback when the scope is exited + with session_scope(session=session_maker(), read_only=True) as session: + for table in tables: + _import_statistics_with_session( + instance, session, metadata, (statistics,), table + ) + stored_statistics = _statistics_during_period_with_session( + hass, + session, + start_time, + None, + {statistic_id}, + "hour" if table == Statistics else "5minute", + None, + {"last_reset", "max", "mean", "min", "state", "sum"}, + ) + if not (stored_statistic := stored_statistics.get(statistic_id)): + _LOGGER.warning( + "Schema validation failed for table: %s", table.__tablename__ + ) + continue + + # We want to look at the last inserted row to make sure there + # is not previous garbage data in the table that would cause + # the test to produce an incorrect result. To achieve this, + # we inserted a row in the future, and now we select the last + # inserted row back. + last_stored_statistic = stored_statistic[-1] + check_columns( + schema_errors, + last_stored_statistic, + statistics, + ("max", "mean", "min", "state", "sum"), + table.__tablename__, + "double precision", + ) + assert statistics["last_reset"] + check_columns( + schema_errors, + last_stored_statistic, + { + "last_reset": datetime_to_timestamp_or_none( + statistics["last_reset"] + ), + "start": datetime_to_timestamp_or_none(statistics["start"]), + }, + ("start", "last_reset"), + table.__tablename__, + "µs precision", + ) + statistics_meta_manager.delete(session, statistic_ids=[statistic_id]) + except Exception as exc: # pylint: disable=broad-except + _LOGGER.exception("Error when validating DB schema: %s", exc) + + return schema_errors + + +def validate_db_schema( + hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session] +) -> set[str]: + """Do some basic checks for common schema errors caused by manual migration.""" + schema_errors: set[str] = set() + schema_errors |= _validate_db_schema_utf8(instance, session_maker) + schema_errors |= _validate_db_schema(hass, instance, session_maker) + if schema_errors: + _LOGGER.debug( + "Detected statistics schema errors: %s", ", ".join(sorted(schema_errors)) + ) + return schema_errors + + +def correct_db_schema( + instance: Recorder, + engine: Engine, + session_maker: Callable[[], Session], + schema_errors: set[str], +) -> None: + """Correct issues detected by validate_db_schema.""" + from ...migration import _modify_columns # pylint: disable=import-outside-toplevel + + if "statistics_meta.4-byte UTF-8" in schema_errors: + # Attempt to convert the table to utf8mb4 + _LOGGER.warning( + ( + "Updating character set and collation of table %s to utf8mb4. " + "Note: this can take several minutes on large databases and slow " + "computers. Please be patient!" + ), + "statistics_meta", + ) + with contextlib.suppress(SQLAlchemyError), session_scope( + session=session_maker() + ) as session: + connection = session.connection() + connection.execute( + # Using LOCK=EXCLUSIVE to prevent the database from corrupting + # https://github.com/home-assistant/core/issues/56104 + text( + "ALTER TABLE statistics_meta CONVERT TO CHARACTER SET utf8mb4" + " COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE" + ) + ) + + tables: tuple[type[Statistics | StatisticsShortTerm], ...] = ( + Statistics, + StatisticsShortTerm, + ) + for table in tables: + if f"{table.__tablename__}.double precision" in schema_errors: + # Attempt to convert float columns to double precision + _modify_columns( + session_maker, + engine, + table.__tablename__, + [ + "mean DOUBLE PRECISION", + "min DOUBLE PRECISION", + "max DOUBLE PRECISION", + "state DOUBLE PRECISION", + "sum DOUBLE PRECISION", + ], + ) + if f"{table.__tablename__}.µs precision" in schema_errors: + # Attempt to convert timestamp columns to µs precision + _modify_columns( + session_maker, + engine, + table.__tablename__, + [ + "last_reset_ts DOUBLE PRECISION", + "start_ts DOUBLE PRECISION", + ], + ) diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index 38eed25bee8..6fc2138d918 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -28,6 +28,14 @@ from homeassistant.core import HomeAssistant from homeassistant.util.enum import try_parse_enum from homeassistant.util.ulid import ulid_to_bytes +from .auto_repairs.statistics.duplicates import ( + delete_statistics_duplicates, + delete_statistics_meta_duplicates, +) +from .auto_repairs.statistics.schema import ( + correct_db_schema as statistics_correct_db_schema, + validate_db_schema as statistics_validate_db_schema, +) from .const import SupportedDialect from .db_schema import ( CONTEXT_ID_BIN_MAX_LENGTH, @@ -55,13 +63,7 @@ from .queries import ( find_states_context_ids_to_migrate, has_used_states_event_ids, ) -from .statistics import ( - correct_db_schema as statistics_correct_db_schema, - delete_statistics_duplicates, - delete_statistics_meta_duplicates, - get_start_time, - validate_db_schema as statistics_validate_db_schema, -) +from .statistics import get_start_time from .tasks import ( CommitTask, PostSchemaMigrationTask, diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 34adcbddcc6..82fbf7798f9 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -2,34 +2,28 @@ from __future__ import annotations from collections import defaultdict -from collections.abc import Callable, Iterable, Mapping, Sequence +from collections.abc import Callable, Iterable, Sequence import contextlib import dataclasses from datetime import datetime, timedelta from functools import lru_cache, partial from itertools import chain, groupby -import json import logging from operator import itemgetter -import os import re from statistics import mean from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast from sqlalchemy import Select, and_, bindparam, func, lambda_stmt, select, text -from sqlalchemy.engine import Engine from sqlalchemy.engine.row import Row -from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError +from sqlalchemy.exc import SQLAlchemyError, StatementError from sqlalchemy.orm.session import Session -from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.lambdas import StatementLambdaElement import voluptuous as vol from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT from homeassistant.core import HomeAssistant, callback, valid_entity_id from homeassistant.exceptions import HomeAssistantError -from homeassistant.helpers.json import JSONEncoder -from homeassistant.helpers.storage import STORAGE_DIR from homeassistant.helpers.typing import UNDEFINED, UndefinedType from homeassistant.util import dt as dt_util from homeassistant.util.unit_conversion import ( @@ -53,14 +47,12 @@ from .const import ( DOMAIN, EVENT_RECORDER_5MIN_STATISTICS_GENERATED, EVENT_RECORDER_HOURLY_STATISTICS_GENERATED, - SQLITE_MAX_BIND_VARS, SupportedDialect, ) from .db_schema import ( STATISTICS_TABLES, Statistics, StatisticsBase, - StatisticsMeta, StatisticsRuns, StatisticsShortTerm, ) @@ -73,7 +65,6 @@ from .models import ( process_timestamp, ) from .util import ( - database_job_retry_wrapper, execute, execute_stmt_lambda_element, get_instance, @@ -333,240 +324,6 @@ def get_start_time() -> datetime: return last_period -def _find_duplicates( - session: Session, table: type[StatisticsBase] -) -> tuple[list[int], list[dict]]: - """Find duplicated statistics.""" - subquery = ( - session.query( - table.start, - table.metadata_id, - literal_column("1").label("is_duplicate"), - ) - .group_by(table.metadata_id, table.start) - # https://github.com/sqlalchemy/sqlalchemy/issues/9189 - # pylint: disable-next=not-callable - .having(func.count() > 1) - .subquery() - ) - query = ( - session.query( - table.id, - table.metadata_id, - table.created, - table.start, - table.mean, - table.min, - table.max, - table.last_reset, - table.state, - table.sum, - ) - .outerjoin( - subquery, - (subquery.c.metadata_id == table.metadata_id) - & (subquery.c.start == table.start), - ) - .filter(subquery.c.is_duplicate == 1) - .order_by(table.metadata_id, table.start, table.id.desc()) - .limit(1000 * SQLITE_MAX_BIND_VARS) - ) - duplicates = execute(query) - original_as_dict = {} - start = None - metadata_id = None - duplicate_ids: list[int] = [] - non_identical_duplicates_as_dict: list[dict] = [] - - if not duplicates: - return (duplicate_ids, non_identical_duplicates_as_dict) - - def columns_to_dict(duplicate: Row) -> dict: - """Convert a SQLAlchemy row to dict.""" - dict_ = {} - for key in ( - "id", - "metadata_id", - "start", - "created", - "mean", - "min", - "max", - "last_reset", - "state", - "sum", - ): - dict_[key] = getattr(duplicate, key) - return dict_ - - def compare_statistic_rows(row1: dict, row2: dict) -> bool: - """Compare two statistics rows, ignoring id and created.""" - ignore_keys = {"id", "created"} - keys1 = set(row1).difference(ignore_keys) - keys2 = set(row2).difference(ignore_keys) - return keys1 == keys2 and all(row1[k] == row2[k] for k in keys1) - - for duplicate in duplicates: - if start != duplicate.start or metadata_id != duplicate.metadata_id: - original_as_dict = columns_to_dict(duplicate) - start = duplicate.start - metadata_id = duplicate.metadata_id - continue - duplicate_as_dict = columns_to_dict(duplicate) - duplicate_ids.append(duplicate.id) - if not compare_statistic_rows(original_as_dict, duplicate_as_dict): - non_identical_duplicates_as_dict.append( - {"duplicate": duplicate_as_dict, "original": original_as_dict} - ) - - return (duplicate_ids, non_identical_duplicates_as_dict) - - -def _delete_duplicates_from_table( - session: Session, table: type[StatisticsBase] -) -> tuple[int, list[dict]]: - """Identify and delete duplicated statistics from a specified table.""" - all_non_identical_duplicates: list[dict] = [] - total_deleted_rows = 0 - while True: - duplicate_ids, non_identical_duplicates = _find_duplicates(session, table) - if not duplicate_ids: - break - all_non_identical_duplicates.extend(non_identical_duplicates) - for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): - deleted_rows = ( - session.query(table) - .filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])) - .delete(synchronize_session=False) - ) - total_deleted_rows += deleted_rows - return (total_deleted_rows, all_non_identical_duplicates) - - -@database_job_retry_wrapper("delete statistics duplicates", 3) -def delete_statistics_duplicates( - instance: Recorder, hass: HomeAssistant, session: Session -) -> None: - """Identify and delete duplicated statistics. - - A backup will be made of duplicated statistics before it is deleted. - """ - deleted_statistics_rows, non_identical_duplicates = _delete_duplicates_from_table( - session, Statistics - ) - if deleted_statistics_rows: - _LOGGER.info("Deleted %s duplicated statistics rows", deleted_statistics_rows) - - if non_identical_duplicates: - isotime = dt_util.utcnow().isoformat() - backup_file_name = f"deleted_statistics.{isotime}.json" - backup_path = hass.config.path(STORAGE_DIR, backup_file_name) - - os.makedirs(os.path.dirname(backup_path), exist_ok=True) - with open(backup_path, "w", encoding="utf8") as backup_file: - json.dump( - non_identical_duplicates, - backup_file, - indent=4, - sort_keys=True, - cls=JSONEncoder, - ) - _LOGGER.warning( - ( - "Deleted %s non identical duplicated %s rows, a backup of the deleted" - " rows has been saved to %s" - ), - len(non_identical_duplicates), - Statistics.__tablename__, - backup_path, - ) - - deleted_short_term_statistics_rows, _ = _delete_duplicates_from_table( - session, StatisticsShortTerm - ) - if deleted_short_term_statistics_rows: - _LOGGER.warning( - "Deleted duplicated short term statistic rows, please report at %s", - "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22", - ) - - -def _find_statistics_meta_duplicates(session: Session) -> list[int]: - """Find duplicated statistics_meta.""" - # When querying the database, be careful to only explicitly query for columns - # which were present in schema version 29. If querying the table, SQLAlchemy - # will refer to future columns. - subquery = ( - session.query( - StatisticsMeta.statistic_id, - literal_column("1").label("is_duplicate"), - ) - .group_by(StatisticsMeta.statistic_id) - # https://github.com/sqlalchemy/sqlalchemy/issues/9189 - # pylint: disable-next=not-callable - .having(func.count() > 1) - .subquery() - ) - query = ( - session.query(StatisticsMeta.statistic_id, StatisticsMeta.id) - .outerjoin( - subquery, - (subquery.c.statistic_id == StatisticsMeta.statistic_id), - ) - .filter(subquery.c.is_duplicate == 1) - .order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc()) - .limit(1000 * SQLITE_MAX_BIND_VARS) - ) - duplicates = execute(query) - statistic_id = None - duplicate_ids: list[int] = [] - - if not duplicates: - return duplicate_ids - - for duplicate in duplicates: - if statistic_id != duplicate.statistic_id: - statistic_id = duplicate.statistic_id - continue - duplicate_ids.append(duplicate.id) - - return duplicate_ids - - -def _delete_statistics_meta_duplicates(session: Session) -> int: - """Identify and delete duplicated statistics from a specified table.""" - total_deleted_rows = 0 - while True: - duplicate_ids = _find_statistics_meta_duplicates(session) - if not duplicate_ids: - break - for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS): - deleted_rows = ( - session.query(StatisticsMeta) - .filter( - StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]) - ) - .delete(synchronize_session=False) - ) - total_deleted_rows += deleted_rows - return total_deleted_rows - - -def delete_statistics_meta_duplicates(instance: Recorder, session: Session) -> None: - """Identify and delete duplicated statistics_meta. - - This is used when migrating from schema version 28 to schema version 29. - """ - deleted_statistics_rows = _delete_statistics_meta_duplicates(session) - if deleted_statistics_rows: - statistics_meta_manager = instance.statistics_meta_manager - statistics_meta_manager.reset() - statistics_meta_manager.load(session) - _LOGGER.info( - "Deleted %s duplicated statistics_meta rows", deleted_statistics_rows - ) - - def _compile_hourly_statistics_summary_mean_stmt( start_time_ts: float, end_time_ts: float ) -> StatementLambdaElement: @@ -2478,271 +2235,6 @@ def async_change_statistics_unit( ) -def _validate_db_schema_utf8( - instance: Recorder, session_maker: Callable[[], Session] -) -> set[str]: - """Do some basic checks for common schema errors caused by manual migration.""" - schema_errors: set[str] = set() - - # Lack of full utf8 support is only an issue for MySQL / MariaDB - if instance.dialect_name != SupportedDialect.MYSQL: - return schema_errors - - # This name can't be represented unless 4-byte UTF-8 unicode is supported - utf8_name = "𓆚𓃗" - statistic_id = f"{DOMAIN}.db_test" - - metadata: StatisticMetaData = { - "has_mean": True, - "has_sum": True, - "name": utf8_name, - "source": DOMAIN, - "statistic_id": statistic_id, - "unit_of_measurement": None, - } - statistics_meta_manager = instance.statistics_meta_manager - - # Try inserting some metadata which needs utfmb4 support - try: - # Mark the session as read_only to ensure that the test data is not committed - # to the database and we always rollback when the scope is exited - with session_scope(session=session_maker(), read_only=True) as session: - old_metadata_dict = statistics_meta_manager.get_many( - session, statistic_ids={statistic_id} - ) - try: - statistics_meta_manager.update_or_add( - session, metadata, old_metadata_dict - ) - statistics_meta_manager.delete(session, statistic_ids=[statistic_id]) - except OperationalError as err: - if err.orig and err.orig.args[0] == 1366: - _LOGGER.debug( - "Database table statistics_meta does not support 4-byte UTF-8" - ) - schema_errors.add("statistics_meta.4-byte UTF-8") - session.rollback() - else: - raise - except Exception as exc: # pylint: disable=broad-except - _LOGGER.exception("Error when validating DB schema: %s", exc) - return schema_errors - - -def _get_future_year() -> int: - """Get a year in the future.""" - return datetime.now().year + 1 - - -def _validate_db_schema( - hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session] -) -> set[str]: - """Do some basic checks for common schema errors caused by manual migration.""" - schema_errors: set[str] = set() - statistics_meta_manager = instance.statistics_meta_manager - - # Wrong precision is only an issue for MySQL / MariaDB / PostgreSQL - if instance.dialect_name not in ( - SupportedDialect.MYSQL, - SupportedDialect.POSTGRESQL, - ): - return schema_errors - - # This number can't be accurately represented as a 32-bit float - precise_number = 1.000000000000001 - # This time can't be accurately represented unless datetimes have µs precision - # - # We want to insert statistics for a time in the future, in case they - # have conflicting metadata_id's with existing statistics that were - # never cleaned up. By inserting in the future, we can be sure that - # that by selecting the last inserted row, we will get the one we - # just inserted. - # - future_year = _get_future_year() - precise_time = datetime(future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC) - start_time = datetime(future_year, 10, 6, tzinfo=dt_util.UTC) - statistic_id = f"{DOMAIN}.db_test" - - metadata: StatisticMetaData = { - "has_mean": True, - "has_sum": True, - "name": None, - "source": DOMAIN, - "statistic_id": statistic_id, - "unit_of_measurement": None, - } - statistics: StatisticData = { - "last_reset": precise_time, - "max": precise_number, - "mean": precise_number, - "min": precise_number, - "start": precise_time, - "state": precise_number, - "sum": precise_number, - } - - def check_columns( - schema_errors: set[str], - stored: Mapping, - expected: Mapping, - columns: tuple[str, ...], - table_name: str, - supports: str, - ) -> None: - for column in columns: - if stored[column] != expected[column]: - schema_errors.add(f"{table_name}.{supports}") - _LOGGER.error( - "Column %s in database table %s does not support %s (stored=%s != expected=%s)", - column, - table_name, - supports, - stored[column], - expected[column], - ) - - # Insert / adjust a test statistics row in each of the tables - tables: tuple[type[Statistics | StatisticsShortTerm], ...] = ( - Statistics, - StatisticsShortTerm, - ) - try: - # Mark the session as read_only to ensure that the test data is not committed - # to the database and we always rollback when the scope is exited - with session_scope(session=session_maker(), read_only=True) as session: - for table in tables: - _import_statistics_with_session( - instance, session, metadata, (statistics,), table - ) - stored_statistics = _statistics_during_period_with_session( - hass, - session, - start_time, - None, - {statistic_id}, - "hour" if table == Statistics else "5minute", - None, - {"last_reset", "max", "mean", "min", "state", "sum"}, - ) - if not (stored_statistic := stored_statistics.get(statistic_id)): - _LOGGER.warning( - "Schema validation failed for table: %s", table.__tablename__ - ) - continue - - # We want to look at the last inserted row to make sure there - # is not previous garbage data in the table that would cause - # the test to produce an incorrect result. To achieve this, - # we inserted a row in the future, and now we select the last - # inserted row back. - last_stored_statistic = stored_statistic[-1] - check_columns( - schema_errors, - last_stored_statistic, - statistics, - ("max", "mean", "min", "state", "sum"), - table.__tablename__, - "double precision", - ) - assert statistics["last_reset"] - check_columns( - schema_errors, - last_stored_statistic, - { - "last_reset": datetime_to_timestamp_or_none( - statistics["last_reset"] - ), - "start": datetime_to_timestamp_or_none(statistics["start"]), - }, - ("start", "last_reset"), - table.__tablename__, - "µs precision", - ) - statistics_meta_manager.delete(session, statistic_ids=[statistic_id]) - except Exception as exc: # pylint: disable=broad-except - _LOGGER.exception("Error when validating DB schema: %s", exc) - - return schema_errors - - -def validate_db_schema( - hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session] -) -> set[str]: - """Do some basic checks for common schema errors caused by manual migration.""" - schema_errors: set[str] = set() - schema_errors |= _validate_db_schema_utf8(instance, session_maker) - schema_errors |= _validate_db_schema(hass, instance, session_maker) - if schema_errors: - _LOGGER.debug( - "Detected statistics schema errors: %s", ", ".join(sorted(schema_errors)) - ) - return schema_errors - - -def correct_db_schema( - instance: Recorder, - engine: Engine, - session_maker: Callable[[], Session], - schema_errors: set[str], -) -> None: - """Correct issues detected by validate_db_schema.""" - from .migration import _modify_columns # pylint: disable=import-outside-toplevel - - if "statistics_meta.4-byte UTF-8" in schema_errors: - # Attempt to convert the table to utf8mb4 - _LOGGER.warning( - ( - "Updating character set and collation of table %s to utf8mb4. " - "Note: this can take several minutes on large databases and slow " - "computers. Please be patient!" - ), - "statistics_meta", - ) - with contextlib.suppress(SQLAlchemyError), session_scope( - session=session_maker() - ) as session: - connection = session.connection() - connection.execute( - # Using LOCK=EXCLUSIVE to prevent the database from corrupting - # https://github.com/home-assistant/core/issues/56104 - text( - "ALTER TABLE statistics_meta CONVERT TO CHARACTER SET utf8mb4" - " COLLATE utf8mb4_unicode_ci, LOCK=EXCLUSIVE" - ) - ) - - tables: tuple[type[Statistics | StatisticsShortTerm], ...] = ( - Statistics, - StatisticsShortTerm, - ) - for table in tables: - if f"{table.__tablename__}.double precision" in schema_errors: - # Attempt to convert float columns to double precision - _modify_columns( - session_maker, - engine, - table.__tablename__, - [ - "mean DOUBLE PRECISION", - "min DOUBLE PRECISION", - "max DOUBLE PRECISION", - "state DOUBLE PRECISION", - "sum DOUBLE PRECISION", - ], - ) - if f"{table.__tablename__}.µs precision" in schema_errors: - # Attempt to convert timestamp columns to µs precision - _modify_columns( - session_maker, - engine, - table.__tablename__, - [ - "last_reset_ts DOUBLE PRECISION", - "start_ts DOUBLE PRECISION", - ], - ) - - def cleanup_statistics_timestamp_migration(instance: Recorder) -> bool: """Clean up the statistics migration from timestamp to datetime. diff --git a/tests/components/recorder/auto_repairs/__init__.py b/tests/components/recorder/auto_repairs/__init__.py new file mode 100644 index 00000000000..6e98d881ea9 --- /dev/null +++ b/tests/components/recorder/auto_repairs/__init__.py @@ -0,0 +1,5 @@ +"""Tests for Recorder component.""" + +import pytest + +pytest.register_assert_rewrite("tests.components.recorder.common") diff --git a/tests/components/recorder/auto_repairs/statistics/__init__.py b/tests/components/recorder/auto_repairs/statistics/__init__.py new file mode 100644 index 00000000000..6e98d881ea9 --- /dev/null +++ b/tests/components/recorder/auto_repairs/statistics/__init__.py @@ -0,0 +1,5 @@ +"""Tests for Recorder component.""" + +import pytest + +pytest.register_assert_rewrite("tests.components.recorder.common") diff --git a/tests/components/recorder/auto_repairs/statistics/test_duplicates.py b/tests/components/recorder/auto_repairs/statistics/test_duplicates.py new file mode 100644 index 00000000000..53dac0b6ab2 --- /dev/null +++ b/tests/components/recorder/auto_repairs/statistics/test_duplicates.py @@ -0,0 +1,330 @@ +"""Test removing statistics duplicates.""" +from collections.abc import Callable + +# pylint: disable=invalid-name +import importlib +import sys +from unittest.mock import patch + +import py +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from homeassistant.components import recorder +from homeassistant.components.recorder import statistics +from homeassistant.components.recorder.auto_repairs.statistics.duplicates import ( + delete_statistics_duplicates, + delete_statistics_meta_duplicates, +) +from homeassistant.components.recorder.const import SQLITE_URL_PREFIX +from homeassistant.components.recorder.statistics import async_add_external_statistics +from homeassistant.components.recorder.util import session_scope +from homeassistant.core import HomeAssistant +from homeassistant.helpers import recorder as recorder_helper +from homeassistant.setup import setup_component +import homeassistant.util.dt as dt_util + +from ...common import wait_recording_done + +from tests.common import get_test_home_assistant + +ORIG_TZ = dt_util.DEFAULT_TIME_ZONE + + +def test_delete_duplicates_no_duplicates( + hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture +) -> None: + """Test removal of duplicated statistics.""" + hass = hass_recorder() + wait_recording_done(hass) + instance = recorder.get_instance(hass) + with session_scope(hass=hass) as session: + delete_statistics_duplicates(instance, hass, session) + assert "duplicated statistics rows" not in caplog.text + assert "Found non identical" not in caplog.text + assert "Found duplicated" not in caplog.text + + +def test_duplicate_statistics_handle_integrity_error( + hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture +) -> None: + """Test the recorder does not blow up if statistics is duplicated.""" + hass = hass_recorder() + wait_recording_done(hass) + + period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00")) + period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00")) + + external_energy_metadata_1 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_1", + "unit_of_measurement": "kWh", + } + external_energy_statistics_1 = [ + { + "start": period1, + "last_reset": None, + "state": 3, + "sum": 5, + }, + ] + external_energy_statistics_2 = [ + { + "start": period2, + "last_reset": None, + "state": 3, + "sum": 6, + } + ] + + with patch.object( + statistics, "_statistics_exists", return_value=False + ), patch.object( + statistics, "_insert_statistics", wraps=statistics._insert_statistics + ) as insert_statistics_mock: + async_add_external_statistics( + hass, external_energy_metadata_1, external_energy_statistics_1 + ) + async_add_external_statistics( + hass, external_energy_metadata_1, external_energy_statistics_1 + ) + async_add_external_statistics( + hass, external_energy_metadata_1, external_energy_statistics_2 + ) + wait_recording_done(hass) + assert insert_statistics_mock.call_count == 3 + + with session_scope(hass=hass) as session: + tmp = session.query(recorder.db_schema.Statistics).all() + assert len(tmp) == 2 + + assert "Blocked attempt to insert duplicated statistic rows" in caplog.text + + +def _create_engine_28(*args, **kwargs): + """Test version of create_engine that initializes with old schema. + + This simulates an existing db with the old schema. + """ + module = "tests.components.recorder.db_schema_28" + importlib.import_module(module) + old_db_schema = sys.modules[module] + engine = create_engine(*args, **kwargs) + old_db_schema.Base.metadata.create_all(engine) + with Session(engine) as session: + session.add( + recorder.db_schema.StatisticsRuns(start=statistics.get_start_time()) + ) + session.add( + recorder.db_schema.SchemaChanges( + schema_version=old_db_schema.SCHEMA_VERSION + ) + ) + session.commit() + return engine + + +def test_delete_metadata_duplicates( + caplog: pytest.LogCaptureFixture, tmpdir: py.path.local +) -> None: + """Test removal of duplicated statistics.""" + test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") + dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" + + module = "tests.components.recorder.db_schema_28" + importlib.import_module(module) + old_db_schema = sys.modules[module] + + external_energy_metadata_1 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_1", + "unit_of_measurement": "kWh", + } + external_energy_metadata_2 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_1", + "unit_of_measurement": "kWh", + } + external_co2_metadata = { + "has_mean": True, + "has_sum": False, + "name": "Fossil percentage", + "source": "test", + "statistic_id": "test:fossil_percentage", + "unit_of_measurement": "%", + } + + # Create some duplicated statistics_meta with schema version 28 + with patch.object(recorder, "db_schema", old_db_schema), patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), patch( + "homeassistant.components.recorder.core.create_engine", new=_create_engine_28 + ): + hass = get_test_home_assistant() + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + wait_recording_done(hass) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) + ) + + with session_scope(hass=hass) as session: + tmp = session.query(recorder.db_schema.StatisticsMeta).all() + assert len(tmp) == 3 + assert tmp[0].id == 1 + assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" + assert tmp[1].id == 2 + assert tmp[1].statistic_id == "test:total_energy_import_tariff_1" + assert tmp[2].id == 3 + assert tmp[2].statistic_id == "test:fossil_percentage" + + hass.stop() + dt_util.DEFAULT_TIME_ZONE = ORIG_TZ + + # Test that the duplicates are removed during migration from schema 28 + hass = get_test_home_assistant() + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + + assert "Deleted 1 duplicated statistics_meta rows" in caplog.text + with session_scope(hass=hass) as session: + tmp = session.query(recorder.db_schema.StatisticsMeta).all() + assert len(tmp) == 2 + assert tmp[0].id == 2 + assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" + assert tmp[1].id == 3 + assert tmp[1].statistic_id == "test:fossil_percentage" + + hass.stop() + dt_util.DEFAULT_TIME_ZONE = ORIG_TZ + + +def test_delete_metadata_duplicates_many( + caplog: pytest.LogCaptureFixture, tmpdir: py.path.local +) -> None: + """Test removal of duplicated statistics.""" + test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") + dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" + + module = "tests.components.recorder.db_schema_28" + importlib.import_module(module) + old_db_schema = sys.modules[module] + + external_energy_metadata_1 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_1", + "unit_of_measurement": "kWh", + } + external_energy_metadata_2 = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "test", + "statistic_id": "test:total_energy_import_tariff_2", + "unit_of_measurement": "kWh", + } + external_co2_metadata = { + "has_mean": True, + "has_sum": False, + "name": "Fossil percentage", + "source": "test", + "statistic_id": "test:fossil_percentage", + "unit_of_measurement": "%", + } + + # Create some duplicated statistics with schema version 28 + with patch.object(recorder, "db_schema", old_db_schema), patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), patch( + "homeassistant.components.recorder.core.create_engine", new=_create_engine_28 + ): + hass = get_test_home_assistant() + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + wait_recording_done(hass) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1) + ) + for _ in range(1100): + session.add( + recorder.db_schema.StatisticsMeta.from_meta( + external_energy_metadata_1 + ) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) + ) + session.add( + recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) + ) + + hass.stop() + dt_util.DEFAULT_TIME_ZONE = ORIG_TZ + + # Test that the duplicates are removed during migration from schema 28 + hass = get_test_home_assistant() + recorder_helper.async_initialize_recorder(hass) + setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) + hass.start() + wait_recording_done(hass) + wait_recording_done(hass) + + assert "Deleted 1102 duplicated statistics_meta rows" in caplog.text + with session_scope(hass=hass) as session: + tmp = session.query(recorder.db_schema.StatisticsMeta).all() + assert len(tmp) == 3 + assert tmp[0].id == 1101 + assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" + assert tmp[1].id == 1103 + assert tmp[1].statistic_id == "test:total_energy_import_tariff_2" + assert tmp[2].id == 1105 + assert tmp[2].statistic_id == "test:fossil_percentage" + + hass.stop() + dt_util.DEFAULT_TIME_ZONE = ORIG_TZ + + +def test_delete_metadata_duplicates_no_duplicates( + hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture +) -> None: + """Test removal of duplicated statistics.""" + hass = hass_recorder() + wait_recording_done(hass) + with session_scope(hass=hass) as session: + instance = recorder.get_instance(hass) + delete_statistics_meta_duplicates(instance, session) + assert "duplicated statistics_meta rows" not in caplog.text diff --git a/tests/components/recorder/auto_repairs/statistics/test_schema.py b/tests/components/recorder/auto_repairs/statistics/test_schema.py new file mode 100644 index 00000000000..2c4e06580f5 --- /dev/null +++ b/tests/components/recorder/auto_repairs/statistics/test_schema.py @@ -0,0 +1,235 @@ +"""The test repairing statistics schema.""" + +# pylint: disable=invalid-name +from datetime import datetime +from unittest.mock import ANY, DEFAULT, MagicMock, patch + +import pytest +from sqlalchemy.exc import OperationalError + +from homeassistant.components.recorder.auto_repairs.statistics.schema import ( + _get_future_year, +) +from homeassistant.components.recorder.statistics import ( + _statistics_during_period_with_session, +) +from homeassistant.components.recorder.table_managers.statistics_meta import ( + StatisticsMetaManager, +) +from homeassistant.core import HomeAssistant +import homeassistant.util.dt as dt_util + +from ...common import async_wait_recording_done + +from tests.typing import RecorderInstanceGenerator + + +@pytest.mark.parametrize("enable_statistics_table_validation", [True]) +@pytest.mark.parametrize("db_engine", ("mysql", "postgresql")) +async def test_validate_db_schema( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, + db_engine, +) -> None: + """Test validating DB schema with MySQL and PostgreSQL. + + Note: The test uses SQLite, the purpose is only to exercise the code. + """ + with patch( + "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine + ): + await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + assert "Schema validation failed" not in caplog.text + assert "Detected statistics schema errors" not in caplog.text + assert "Database is about to correct DB schema errors" not in caplog.text + + +@pytest.mark.parametrize("enable_statistics_table_validation", [True]) +async def test_validate_db_schema_fix_utf8_issue( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, +) -> None: + """Test validating DB schema with MySQL. + + Note: The test uses SQLite, the purpose is only to exercise the code. + """ + orig_error = MagicMock() + orig_error.args = [1366] + utf8_error = OperationalError("", "", orig=orig_error) + with patch( + "homeassistant.components.recorder.core.Recorder.dialect_name", "mysql" + ), patch( + "homeassistant.components.recorder.table_managers.statistics_meta.StatisticsMetaManager.update_or_add", + wraps=StatisticsMetaManager.update_or_add, + side_effect=[utf8_error, DEFAULT, DEFAULT], + ): + await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + assert "Schema validation failed" not in caplog.text + assert ( + "Database is about to correct DB schema errors: statistics_meta.4-byte UTF-8" + in caplog.text + ) + assert ( + "Updating character set and collation of table statistics_meta to utf8mb4" + in caplog.text + ) + + +@pytest.mark.parametrize("enable_statistics_table_validation", [True]) +@pytest.mark.parametrize("db_engine", ("mysql", "postgresql")) +@pytest.mark.parametrize( + ("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1)) +) +@pytest.mark.parametrize( + ("column", "value"), + (("max", 1.0), ("mean", 1.0), ("min", 1.0), ("state", 1.0), ("sum", 1.0)), +) +async def test_validate_db_schema_fix_float_issue( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, + db_engine, + table, + replace_index, + column, + value, +) -> None: + """Test validating DB schema with MySQL. + + Note: The test uses SQLite, the purpose is only to exercise the code. + """ + orig_error = MagicMock() + orig_error.args = [1366] + precise_number = 1.000000000000001 + fixed_future_year = _get_future_year() + precise_time = datetime(fixed_future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC) + statistics = { + "recorder.db_test": [ + { + "last_reset": precise_time.timestamp(), + "max": precise_number, + "mean": precise_number, + "min": precise_number, + "start": precise_time.timestamp(), + "state": precise_number, + "sum": precise_number, + } + ] + } + statistics["recorder.db_test"][0][column] = value + fake_statistics = [DEFAULT, DEFAULT] + fake_statistics[replace_index] = statistics + + with patch( + "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine + ), patch( + "homeassistant.components.recorder.auto_repairs.statistics.schema._get_future_year", + return_value=fixed_future_year, + ), patch( + "homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session", + side_effect=fake_statistics, + wraps=_statistics_during_period_with_session, + ), patch( + "homeassistant.components.recorder.migration._modify_columns" + ) as modify_columns_mock: + await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + assert "Schema validation failed" not in caplog.text + assert ( + f"Database is about to correct DB schema errors: {table}.double precision" + in caplog.text + ) + modification = [ + "mean DOUBLE PRECISION", + "min DOUBLE PRECISION", + "max DOUBLE PRECISION", + "state DOUBLE PRECISION", + "sum DOUBLE PRECISION", + ] + modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification) + + +@pytest.mark.parametrize("enable_statistics_table_validation", [True]) +@pytest.mark.parametrize( + ("db_engine", "modification"), + ( + ("mysql", ["last_reset_ts DOUBLE PRECISION", "start_ts DOUBLE PRECISION"]), + ( + "postgresql", + [ + "last_reset_ts DOUBLE PRECISION", + "start_ts DOUBLE PRECISION", + ], + ), + ), +) +@pytest.mark.parametrize( + ("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1)) +) +@pytest.mark.parametrize( + ("column", "value"), + ( + ("last_reset", "2020-10-06T00:00:00+00:00"), + ("start", "2020-10-06T00:00:00+00:00"), + ), +) +async def test_validate_db_schema_fix_statistics_datetime_issue( + async_setup_recorder_instance: RecorderInstanceGenerator, + hass: HomeAssistant, + caplog: pytest.LogCaptureFixture, + db_engine, + modification, + table, + replace_index, + column, + value, +) -> None: + """Test validating DB schema with MySQL. + + Note: The test uses SQLite, the purpose is only to exercise the code. + """ + orig_error = MagicMock() + orig_error.args = [1366] + precise_number = 1.000000000000001 + precise_time = datetime(2020, 10, 6, microsecond=1, tzinfo=dt_util.UTC) + statistics = { + "recorder.db_test": [ + { + "last_reset": precise_time, + "max": precise_number, + "mean": precise_number, + "min": precise_number, + "start": precise_time, + "state": precise_number, + "sum": precise_number, + } + ] + } + statistics["recorder.db_test"][0][column] = value + fake_statistics = [DEFAULT, DEFAULT] + fake_statistics[replace_index] = statistics + + with patch( + "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine + ), patch( + "homeassistant.components.recorder.auto_repairs.statistics.schema._statistics_during_period_with_session", + side_effect=fake_statistics, + wraps=_statistics_during_period_with_session, + ), patch( + "homeassistant.components.recorder.migration._modify_columns" + ) as modify_columns_mock: + await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + assert "Schema validation failed" not in caplog.text + assert ( + f"Database is about to correct DB schema errors: {table}.µs precision" + in caplog.text + ) + modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification) diff --git a/tests/components/recorder/test_statistics.py b/tests/components/recorder/test_statistics.py index d783d72be2d..ebad039ca45 100644 --- a/tests/components/recorder/test_statistics.py +++ b/tests/components/recorder/test_statistics.py @@ -2,20 +2,14 @@ from collections.abc import Callable # pylint: disable=invalid-name -from datetime import datetime, timedelta -import importlib -import sys -from unittest.mock import ANY, DEFAULT, MagicMock, patch +from datetime import timedelta +from unittest.mock import patch -import py import pytest -from sqlalchemy import create_engine, select -from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import Session +from sqlalchemy import select from homeassistant.components import recorder from homeassistant.components.recorder import Recorder, history, statistics -from homeassistant.components.recorder.const import SQLITE_URL_PREFIX from homeassistant.components.recorder.db_schema import StatisticsShortTerm from homeassistant.components.recorder.models import ( datetime_to_timestamp_or_none, @@ -26,12 +20,8 @@ from homeassistant.components.recorder.statistics import ( _generate_max_mean_min_statistic_in_sub_period_stmt, _generate_statistics_at_time_stmt, _generate_statistics_during_period_stmt, - _get_future_year, - _statistics_during_period_with_session, async_add_external_statistics, async_import_statistics, - delete_statistics_duplicates, - delete_statistics_meta_duplicates, get_last_short_term_statistics, get_last_statistics, get_latest_short_term_statistics, @@ -39,14 +29,12 @@ from homeassistant.components.recorder.statistics import ( list_statistic_ids, ) from homeassistant.components.recorder.table_managers.statistics_meta import ( - StatisticsMetaManager, _generate_get_metadata_stmt, ) from homeassistant.components.recorder.util import session_scope from homeassistant.components.sensor import UNIT_CONVERTERS from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import HomeAssistantError -from homeassistant.helpers import recorder as recorder_helper from homeassistant.setup import setup_component import homeassistant.util.dt as dt_util @@ -59,8 +47,8 @@ from .common import ( wait_recording_done, ) -from tests.common import get_test_home_assistant, mock_registry -from tests.typing import RecorderInstanceGenerator, WebSocketGenerator +from tests.common import mock_registry +from tests.typing import WebSocketGenerator ORIG_TZ = dt_util.DEFAULT_TIME_ZONE @@ -1254,515 +1242,6 @@ def test_monthly_statistics( dt_util.set_default_time_zone(dt_util.get_time_zone("UTC")) -def test_delete_duplicates_no_duplicates( - hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture -) -> None: - """Test removal of duplicated statistics.""" - hass = hass_recorder() - wait_recording_done(hass) - instance = recorder.get_instance(hass) - with session_scope(hass=hass) as session: - delete_statistics_duplicates(instance, hass, session) - assert "duplicated statistics rows" not in caplog.text - assert "Found non identical" not in caplog.text - assert "Found duplicated" not in caplog.text - - -def test_duplicate_statistics_handle_integrity_error( - hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture -) -> None: - """Test the recorder does not blow up if statistics is duplicated.""" - hass = hass_recorder() - wait_recording_done(hass) - - period1 = dt_util.as_utc(dt_util.parse_datetime("2021-09-01 00:00:00")) - period2 = dt_util.as_utc(dt_util.parse_datetime("2021-09-30 23:00:00")) - - external_energy_metadata_1 = { - "has_mean": False, - "has_sum": True, - "name": "Total imported energy", - "source": "test", - "statistic_id": "test:total_energy_import_tariff_1", - "unit_of_measurement": "kWh", - } - external_energy_statistics_1 = [ - { - "start": period1, - "last_reset": None, - "state": 3, - "sum": 5, - }, - ] - external_energy_statistics_2 = [ - { - "start": period2, - "last_reset": None, - "state": 3, - "sum": 6, - } - ] - - with patch.object( - statistics, "_statistics_exists", return_value=False - ), patch.object( - statistics, "_insert_statistics", wraps=statistics._insert_statistics - ) as insert_statistics_mock: - async_add_external_statistics( - hass, external_energy_metadata_1, external_energy_statistics_1 - ) - async_add_external_statistics( - hass, external_energy_metadata_1, external_energy_statistics_1 - ) - async_add_external_statistics( - hass, external_energy_metadata_1, external_energy_statistics_2 - ) - wait_recording_done(hass) - assert insert_statistics_mock.call_count == 3 - - with session_scope(hass=hass) as session: - tmp = session.query(recorder.db_schema.Statistics).all() - assert len(tmp) == 2 - - assert "Blocked attempt to insert duplicated statistic rows" in caplog.text - - -def _create_engine_28(*args, **kwargs): - """Test version of create_engine that initializes with old schema. - - This simulates an existing db with the old schema. - """ - module = "tests.components.recorder.db_schema_28" - importlib.import_module(module) - old_db_schema = sys.modules[module] - engine = create_engine(*args, **kwargs) - old_db_schema.Base.metadata.create_all(engine) - with Session(engine) as session: - session.add( - recorder.db_schema.StatisticsRuns(start=statistics.get_start_time()) - ) - session.add( - recorder.db_schema.SchemaChanges( - schema_version=old_db_schema.SCHEMA_VERSION - ) - ) - session.commit() - return engine - - -def test_delete_metadata_duplicates( - caplog: pytest.LogCaptureFixture, tmpdir: py.path.local -) -> None: - """Test removal of duplicated statistics.""" - test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") - dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" - - module = "tests.components.recorder.db_schema_28" - importlib.import_module(module) - old_db_schema = sys.modules[module] - - external_energy_metadata_1 = { - "has_mean": False, - "has_sum": True, - "name": "Total imported energy", - "source": "test", - "statistic_id": "test:total_energy_import_tariff_1", - "unit_of_measurement": "kWh", - } - external_energy_metadata_2 = { - "has_mean": False, - "has_sum": True, - "name": "Total imported energy", - "source": "test", - "statistic_id": "test:total_energy_import_tariff_1", - "unit_of_measurement": "kWh", - } - external_co2_metadata = { - "has_mean": True, - "has_sum": False, - "name": "Fossil percentage", - "source": "test", - "statistic_id": "test:fossil_percentage", - "unit_of_measurement": "%", - } - - # Create some duplicated statistics_meta with schema version 28 - with patch.object(recorder, "db_schema", old_db_schema), patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch( - "homeassistant.components.recorder.core.create_engine", new=_create_engine_28 - ): - hass = get_test_home_assistant() - recorder_helper.async_initialize_recorder(hass) - setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) - wait_recording_done(hass) - wait_recording_done(hass) - - with session_scope(hass=hass) as session: - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) - ) - - with session_scope(hass=hass) as session: - tmp = session.query(recorder.db_schema.StatisticsMeta).all() - assert len(tmp) == 3 - assert tmp[0].id == 1 - assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" - assert tmp[1].id == 2 - assert tmp[1].statistic_id == "test:total_energy_import_tariff_1" - assert tmp[2].id == 3 - assert tmp[2].statistic_id == "test:fossil_percentage" - - hass.stop() - dt_util.DEFAULT_TIME_ZONE = ORIG_TZ - - # Test that the duplicates are removed during migration from schema 28 - hass = get_test_home_assistant() - recorder_helper.async_initialize_recorder(hass) - setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) - hass.start() - wait_recording_done(hass) - wait_recording_done(hass) - - assert "Deleted 1 duplicated statistics_meta rows" in caplog.text - with session_scope(hass=hass) as session: - tmp = session.query(recorder.db_schema.StatisticsMeta).all() - assert len(tmp) == 2 - assert tmp[0].id == 2 - assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" - assert tmp[1].id == 3 - assert tmp[1].statistic_id == "test:fossil_percentage" - - hass.stop() - dt_util.DEFAULT_TIME_ZONE = ORIG_TZ - - -def test_delete_metadata_duplicates_many( - caplog: pytest.LogCaptureFixture, tmpdir: py.path.local -) -> None: - """Test removal of duplicated statistics.""" - test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") - dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" - - module = "tests.components.recorder.db_schema_28" - importlib.import_module(module) - old_db_schema = sys.modules[module] - - external_energy_metadata_1 = { - "has_mean": False, - "has_sum": True, - "name": "Total imported energy", - "source": "test", - "statistic_id": "test:total_energy_import_tariff_1", - "unit_of_measurement": "kWh", - } - external_energy_metadata_2 = { - "has_mean": False, - "has_sum": True, - "name": "Total imported energy", - "source": "test", - "statistic_id": "test:total_energy_import_tariff_2", - "unit_of_measurement": "kWh", - } - external_co2_metadata = { - "has_mean": True, - "has_sum": False, - "name": "Fossil percentage", - "source": "test", - "statistic_id": "test:fossil_percentage", - "unit_of_measurement": "%", - } - - # Create some duplicated statistics with schema version 28 - with patch.object(recorder, "db_schema", old_db_schema), patch.object( - recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch( - "homeassistant.components.recorder.core.create_engine", new=_create_engine_28 - ): - hass = get_test_home_assistant() - recorder_helper.async_initialize_recorder(hass) - setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) - wait_recording_done(hass) - wait_recording_done(hass) - - with session_scope(hass=hass) as session: - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_1) - ) - for _ in range(1100): - session.add( - recorder.db_schema.StatisticsMeta.from_meta( - external_energy_metadata_1 - ) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_energy_metadata_2) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) - ) - session.add( - recorder.db_schema.StatisticsMeta.from_meta(external_co2_metadata) - ) - - hass.stop() - dt_util.DEFAULT_TIME_ZONE = ORIG_TZ - - # Test that the duplicates are removed during migration from schema 28 - hass = get_test_home_assistant() - recorder_helper.async_initialize_recorder(hass) - setup_component(hass, "recorder", {"recorder": {"db_url": dburl}}) - hass.start() - wait_recording_done(hass) - wait_recording_done(hass) - - assert "Deleted 1102 duplicated statistics_meta rows" in caplog.text - with session_scope(hass=hass) as session: - tmp = session.query(recorder.db_schema.StatisticsMeta).all() - assert len(tmp) == 3 - assert tmp[0].id == 1101 - assert tmp[0].statistic_id == "test:total_energy_import_tariff_1" - assert tmp[1].id == 1103 - assert tmp[1].statistic_id == "test:total_energy_import_tariff_2" - assert tmp[2].id == 1105 - assert tmp[2].statistic_id == "test:fossil_percentage" - - hass.stop() - dt_util.DEFAULT_TIME_ZONE = ORIG_TZ - - -def test_delete_metadata_duplicates_no_duplicates( - hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture -) -> None: - """Test removal of duplicated statistics.""" - hass = hass_recorder() - wait_recording_done(hass) - with session_scope(hass=hass) as session: - instance = recorder.get_instance(hass) - delete_statistics_meta_duplicates(instance, session) - assert "duplicated statistics_meta rows" not in caplog.text - - -@pytest.mark.parametrize("enable_statistics_table_validation", [True]) -@pytest.mark.parametrize("db_engine", ("mysql", "postgresql")) -async def test_validate_db_schema( - async_setup_recorder_instance: RecorderInstanceGenerator, - hass: HomeAssistant, - caplog: pytest.LogCaptureFixture, - db_engine, -) -> None: - """Test validating DB schema with MySQL and PostgreSQL. - - Note: The test uses SQLite, the purpose is only to exercise the code. - """ - with patch( - "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine - ): - await async_setup_recorder_instance(hass) - await async_wait_recording_done(hass) - assert "Schema validation failed" not in caplog.text - assert "Detected statistics schema errors" not in caplog.text - assert "Database is about to correct DB schema errors" not in caplog.text - - -@pytest.mark.parametrize("enable_statistics_table_validation", [True]) -async def test_validate_db_schema_fix_utf8_issue( - async_setup_recorder_instance: RecorderInstanceGenerator, - hass: HomeAssistant, - caplog: pytest.LogCaptureFixture, -) -> None: - """Test validating DB schema with MySQL. - - Note: The test uses SQLite, the purpose is only to exercise the code. - """ - orig_error = MagicMock() - orig_error.args = [1366] - utf8_error = OperationalError("", "", orig=orig_error) - with patch( - "homeassistant.components.recorder.core.Recorder.dialect_name", "mysql" - ), patch( - "homeassistant.components.recorder.table_managers.statistics_meta.StatisticsMetaManager.update_or_add", - wraps=StatisticsMetaManager.update_or_add, - side_effect=[utf8_error, DEFAULT, DEFAULT], - ): - await async_setup_recorder_instance(hass) - await async_wait_recording_done(hass) - - assert "Schema validation failed" not in caplog.text - assert ( - "Database is about to correct DB schema errors: statistics_meta.4-byte UTF-8" - in caplog.text - ) - assert ( - "Updating character set and collation of table statistics_meta to utf8mb4" - in caplog.text - ) - - -@pytest.mark.parametrize("enable_statistics_table_validation", [True]) -@pytest.mark.parametrize("db_engine", ("mysql", "postgresql")) -@pytest.mark.parametrize( - ("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1)) -) -@pytest.mark.parametrize( - ("column", "value"), - (("max", 1.0), ("mean", 1.0), ("min", 1.0), ("state", 1.0), ("sum", 1.0)), -) -async def test_validate_db_schema_fix_float_issue( - async_setup_recorder_instance: RecorderInstanceGenerator, - hass: HomeAssistant, - caplog: pytest.LogCaptureFixture, - db_engine, - table, - replace_index, - column, - value, -) -> None: - """Test validating DB schema with MySQL. - - Note: The test uses SQLite, the purpose is only to exercise the code. - """ - orig_error = MagicMock() - orig_error.args = [1366] - precise_number = 1.000000000000001 - fixed_future_year = _get_future_year() - precise_time = datetime(fixed_future_year, 10, 6, microsecond=1, tzinfo=dt_util.UTC) - statistics = { - "recorder.db_test": [ - { - "last_reset": precise_time.timestamp(), - "max": precise_number, - "mean": precise_number, - "min": precise_number, - "start": precise_time.timestamp(), - "state": precise_number, - "sum": precise_number, - } - ] - } - statistics["recorder.db_test"][0][column] = value - fake_statistics = [DEFAULT, DEFAULT] - fake_statistics[replace_index] = statistics - - with patch( - "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine - ), patch( - "homeassistant.components.recorder.statistics._get_future_year", - return_value=fixed_future_year, - ), patch( - "homeassistant.components.recorder.statistics._statistics_during_period_with_session", - side_effect=fake_statistics, - wraps=_statistics_during_period_with_session, - ), patch( - "homeassistant.components.recorder.migration._modify_columns" - ) as modify_columns_mock: - await async_setup_recorder_instance(hass) - await async_wait_recording_done(hass) - - assert "Schema validation failed" not in caplog.text - assert ( - f"Database is about to correct DB schema errors: {table}.double precision" - in caplog.text - ) - modification = [ - "mean DOUBLE PRECISION", - "min DOUBLE PRECISION", - "max DOUBLE PRECISION", - "state DOUBLE PRECISION", - "sum DOUBLE PRECISION", - ] - modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification) - - -@pytest.mark.parametrize("enable_statistics_table_validation", [True]) -@pytest.mark.parametrize( - ("db_engine", "modification"), - ( - ("mysql", ["last_reset_ts DOUBLE PRECISION", "start_ts DOUBLE PRECISION"]), - ( - "postgresql", - [ - "last_reset_ts DOUBLE PRECISION", - "start_ts DOUBLE PRECISION", - ], - ), - ), -) -@pytest.mark.parametrize( - ("table", "replace_index"), (("statistics", 0), ("statistics_short_term", 1)) -) -@pytest.mark.parametrize( - ("column", "value"), - ( - ("last_reset", "2020-10-06T00:00:00+00:00"), - ("start", "2020-10-06T00:00:00+00:00"), - ), -) -async def test_validate_db_schema_fix_statistics_datetime_issue( - async_setup_recorder_instance: RecorderInstanceGenerator, - hass: HomeAssistant, - caplog: pytest.LogCaptureFixture, - db_engine, - modification, - table, - replace_index, - column, - value, -) -> None: - """Test validating DB schema with MySQL. - - Note: The test uses SQLite, the purpose is only to exercise the code. - """ - orig_error = MagicMock() - orig_error.args = [1366] - precise_number = 1.000000000000001 - precise_time = datetime(2020, 10, 6, microsecond=1, tzinfo=dt_util.UTC) - statistics = { - "recorder.db_test": [ - { - "last_reset": precise_time, - "max": precise_number, - "mean": precise_number, - "min": precise_number, - "start": precise_time, - "state": precise_number, - "sum": precise_number, - } - ] - } - statistics["recorder.db_test"][0][column] = value - fake_statistics = [DEFAULT, DEFAULT] - fake_statistics[replace_index] = statistics - - with patch( - "homeassistant.components.recorder.core.Recorder.dialect_name", db_engine - ), patch( - "homeassistant.components.recorder.statistics._statistics_during_period_with_session", - side_effect=fake_statistics, - wraps=_statistics_during_period_with_session, - ), patch( - "homeassistant.components.recorder.migration._modify_columns" - ) as modify_columns_mock: - await async_setup_recorder_instance(hass) - await async_wait_recording_done(hass) - - assert "Schema validation failed" not in caplog.text - assert ( - f"Database is about to correct DB schema errors: {table}.µs precision" - in caplog.text - ) - modify_columns_mock.assert_called_once_with(ANY, ANY, table, modification) - - def test_cache_key_for_generate_statistics_during_period_stmt() -> None: """Test cache key for _generate_statistics_during_period_stmt.""" columns = select(StatisticsShortTerm.metadata_id, StatisticsShortTerm.start_ts) diff --git a/tests/conftest.py b/tests/conftest.py index 5a1c44b7819..c5197dd2bd2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1282,13 +1282,16 @@ def hass_recorder( # pylint: disable-next=import-outside-toplevel from homeassistant.components import recorder + # pylint: disable-next=import-outside-toplevel + from homeassistant.components.recorder.auto_repairs.statistics import schema + original_tz = dt_util.DEFAULT_TIME_ZONE hass = get_test_home_assistant() nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None stats_validate = ( - recorder.statistics.validate_db_schema + schema.validate_db_schema if enable_statistics_table_validation else itertools.repeat(set()) ) @@ -1397,13 +1400,16 @@ async def async_setup_recorder_instance( # pylint: disable-next=import-outside-toplevel from homeassistant.components import recorder + # pylint: disable-next=import-outside-toplevel + from homeassistant.components.recorder.auto_repairs.statistics import schema + # pylint: disable-next=import-outside-toplevel from .components.recorder.common import async_recorder_block_till_done nightly = recorder.Recorder.async_nightly_tasks if enable_nightly_purge else None stats = recorder.Recorder.async_periodic_statistics if enable_statistics else None stats_validate = ( - recorder.statistics.validate_db_schema + schema.validate_db_schema if enable_statistics_table_validation else itertools.repeat(set()) )