mirror of
https://github.com/home-assistant/core.git
synced 2025-06-03 20:57:08 +00:00
Use multiple indexed group-by queries to get start time states for MySQL (#138786)
* tweaks * mysql * mysql * Update homeassistant/components/recorder/history/modern.py * Update homeassistant/components/recorder/history/modern.py * Update homeassistant/components/recorder/const.py * Update homeassistant/components/recorder/statistics.py * Apply suggestions from code review * mysql * mysql * cover * make sure db is fully init on old schema * fixes * fixes * coverage * coverage * coverage * s/slow_dependant_subquery/slow_dependent_subquery/g * reword * comment that callers are responsible for staying under the limit * comment that callers are responsible for staying under the limit * switch to kwargs * reduce branching complexity * split stats query * preen * split tests * split tests
This commit is contained in:
parent
0c803520a3
commit
c9abe76023
@ -30,6 +30,12 @@ CONF_DB_INTEGRITY_CHECK = "db_integrity_check"
|
||||
MAX_QUEUE_BACKLOG_MIN_VALUE = 65000
|
||||
MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG = 256 * 1024**2
|
||||
|
||||
# As soon as we have more than 999 ids, split the query as the
|
||||
# MySQL optimizer handles it poorly and will no longer
|
||||
# do an index only scan with a group-by
|
||||
# https://github.com/home-assistant/core/issues/132865#issuecomment-2543160459
|
||||
MAX_IDS_FOR_INDEXED_GROUP_BY = 999
|
||||
|
||||
# The maximum number of rows (events) we purge in one delete statement
|
||||
|
||||
DEFAULT_MAX_BIND_VARS = 4000
|
||||
|
@ -6,11 +6,12 @@ from collections.abc import Callable, Iterable, Iterator
|
||||
from datetime import datetime
|
||||
from itertools import groupby
|
||||
from operator import itemgetter
|
||||
from typing import Any, cast
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from sqlalchemy import (
|
||||
CompoundSelect,
|
||||
Select,
|
||||
StatementLambdaElement,
|
||||
Subquery,
|
||||
and_,
|
||||
func,
|
||||
@ -26,8 +27,9 @@ from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_
|
||||
from homeassistant.core import HomeAssistant, State, split_entity_id
|
||||
from homeassistant.helpers.recorder import get_instance
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.collection import chunked_or_all
|
||||
|
||||
from ..const import LAST_REPORTED_SCHEMA_VERSION
|
||||
from ..const import LAST_REPORTED_SCHEMA_VERSION, MAX_IDS_FOR_INDEXED_GROUP_BY
|
||||
from ..db_schema import (
|
||||
SHARED_ATTR_OR_LEGACY_ATTRIBUTES,
|
||||
StateAttributes,
|
||||
@ -149,6 +151,7 @@ def _significant_states_stmt(
|
||||
no_attributes: bool,
|
||||
include_start_time_state: bool,
|
||||
run_start_ts: float | None,
|
||||
slow_dependent_subquery: bool,
|
||||
) -> Select | CompoundSelect:
|
||||
"""Query the database for significant state changes."""
|
||||
include_last_changed = not significant_changes_only
|
||||
@ -187,6 +190,7 @@ def _significant_states_stmt(
|
||||
metadata_ids,
|
||||
no_attributes,
|
||||
include_last_changed,
|
||||
slow_dependent_subquery,
|
||||
).subquery(),
|
||||
no_attributes,
|
||||
include_last_changed,
|
||||
@ -257,7 +261,68 @@ def get_significant_states_with_session(
|
||||
start_time_ts = start_time.timestamp()
|
||||
end_time_ts = datetime_to_timestamp_or_none(end_time)
|
||||
single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None
|
||||
stmt = lambda_stmt(
|
||||
rows: list[Row] = []
|
||||
if TYPE_CHECKING:
|
||||
assert instance.database_engine is not None
|
||||
slow_dependent_subquery = instance.database_engine.optimizer.slow_dependent_subquery
|
||||
if include_start_time_state and slow_dependent_subquery:
|
||||
# https://github.com/home-assistant/core/issues/137178
|
||||
# If we include the start time state we need to limit the
|
||||
# number of metadata_ids we query for at a time to avoid
|
||||
# hitting limits in the MySQL optimizer that prevent
|
||||
# the start time state query from using an index-only optimization
|
||||
# to find the start time state.
|
||||
iter_metadata_ids = chunked_or_all(metadata_ids, MAX_IDS_FOR_INDEXED_GROUP_BY)
|
||||
else:
|
||||
iter_metadata_ids = (metadata_ids,)
|
||||
for metadata_ids_chunk in iter_metadata_ids:
|
||||
stmt = _generate_significant_states_with_session_stmt(
|
||||
start_time_ts,
|
||||
end_time_ts,
|
||||
single_metadata_id,
|
||||
metadata_ids_chunk,
|
||||
metadata_ids_in_significant_domains,
|
||||
significant_changes_only,
|
||||
no_attributes,
|
||||
include_start_time_state,
|
||||
oldest_ts,
|
||||
slow_dependent_subquery,
|
||||
)
|
||||
row_chunk = cast(
|
||||
list[Row],
|
||||
execute_stmt_lambda_element(session, stmt, None, end_time, orm_rows=False),
|
||||
)
|
||||
if rows:
|
||||
rows += row_chunk
|
||||
else:
|
||||
# If we have no rows yet, we can just assign the chunk
|
||||
# as this is the common case since its rare that
|
||||
# we exceed the MAX_IDS_FOR_INDEXED_GROUP_BY limit
|
||||
rows = row_chunk
|
||||
return _sorted_states_to_dict(
|
||||
rows,
|
||||
start_time_ts if include_start_time_state else None,
|
||||
entity_ids,
|
||||
entity_id_to_metadata_id,
|
||||
minimal_response,
|
||||
compressed_state_format,
|
||||
no_attributes=no_attributes,
|
||||
)
|
||||
|
||||
|
||||
def _generate_significant_states_with_session_stmt(
|
||||
start_time_ts: float,
|
||||
end_time_ts: float | None,
|
||||
single_metadata_id: int | None,
|
||||
metadata_ids: list[int],
|
||||
metadata_ids_in_significant_domains: list[int],
|
||||
significant_changes_only: bool,
|
||||
no_attributes: bool,
|
||||
include_start_time_state: bool,
|
||||
oldest_ts: float | None,
|
||||
slow_dependent_subquery: bool,
|
||||
) -> StatementLambdaElement:
|
||||
return lambda_stmt(
|
||||
lambda: _significant_states_stmt(
|
||||
start_time_ts,
|
||||
end_time_ts,
|
||||
@ -268,6 +333,7 @@ def get_significant_states_with_session(
|
||||
no_attributes,
|
||||
include_start_time_state,
|
||||
oldest_ts,
|
||||
slow_dependent_subquery,
|
||||
),
|
||||
track_on=[
|
||||
bool(single_metadata_id),
|
||||
@ -276,17 +342,9 @@ def get_significant_states_with_session(
|
||||
significant_changes_only,
|
||||
no_attributes,
|
||||
include_start_time_state,
|
||||
slow_dependent_subquery,
|
||||
],
|
||||
)
|
||||
return _sorted_states_to_dict(
|
||||
execute_stmt_lambda_element(session, stmt, None, end_time, orm_rows=False),
|
||||
start_time_ts if include_start_time_state else None,
|
||||
entity_ids,
|
||||
entity_id_to_metadata_id,
|
||||
minimal_response,
|
||||
compressed_state_format,
|
||||
no_attributes=no_attributes,
|
||||
)
|
||||
|
||||
|
||||
def get_full_significant_states_with_session(
|
||||
@ -554,13 +612,14 @@ def get_last_state_changes(
|
||||
)
|
||||
|
||||
|
||||
def _get_start_time_state_for_entities_stmt(
|
||||
def _get_start_time_state_for_entities_stmt_dependent_sub_query(
|
||||
epoch_time: float,
|
||||
metadata_ids: list[int],
|
||||
no_attributes: bool,
|
||||
include_last_changed: bool,
|
||||
) -> Select:
|
||||
"""Baked query to get states for specific entities."""
|
||||
# Engine has a fast dependent subquery optimizer
|
||||
# This query is the result of significant research in
|
||||
# https://github.com/home-assistant/core/issues/132865
|
||||
# A reverse index scan with a limit 1 is the fastest way to get the
|
||||
@ -570,7 +629,9 @@ def _get_start_time_state_for_entities_stmt(
|
||||
# before a specific point in time for all entities.
|
||||
stmt = (
|
||||
_stmt_and_join_attributes_for_start_state(
|
||||
no_attributes, include_last_changed, False
|
||||
no_attributes=no_attributes,
|
||||
include_last_changed=include_last_changed,
|
||||
include_last_reported=False,
|
||||
)
|
||||
.select_from(StatesMeta)
|
||||
.join(
|
||||
@ -600,6 +661,55 @@ def _get_start_time_state_for_entities_stmt(
|
||||
)
|
||||
|
||||
|
||||
def _get_start_time_state_for_entities_stmt_group_by(
|
||||
epoch_time: float,
|
||||
metadata_ids: list[int],
|
||||
no_attributes: bool,
|
||||
include_last_changed: bool,
|
||||
) -> Select:
|
||||
"""Baked query to get states for specific entities."""
|
||||
# Simple group-by for MySQL, must use less
|
||||
# than 1000 metadata_ids in the IN clause for MySQL
|
||||
# or it will optimize poorly. Callers are responsible
|
||||
# for ensuring that the number of metadata_ids is less
|
||||
# than 1000.
|
||||
most_recent_states_for_entities_by_date = (
|
||||
select(
|
||||
States.metadata_id.label("max_metadata_id"),
|
||||
func.max(States.last_updated_ts).label("max_last_updated"),
|
||||
)
|
||||
.filter(
|
||||
(States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids)
|
||||
)
|
||||
.group_by(States.metadata_id)
|
||||
.subquery()
|
||||
)
|
||||
stmt = (
|
||||
_stmt_and_join_attributes_for_start_state(
|
||||
no_attributes=no_attributes,
|
||||
include_last_changed=include_last_changed,
|
||||
include_last_reported=False,
|
||||
)
|
||||
.join(
|
||||
most_recent_states_for_entities_by_date,
|
||||
and_(
|
||||
States.metadata_id
|
||||
== most_recent_states_for_entities_by_date.c.max_metadata_id,
|
||||
States.last_updated_ts
|
||||
== most_recent_states_for_entities_by_date.c.max_last_updated,
|
||||
),
|
||||
)
|
||||
.filter(
|
||||
(States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids)
|
||||
)
|
||||
)
|
||||
if no_attributes:
|
||||
return stmt
|
||||
return stmt.outerjoin(
|
||||
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
|
||||
)
|
||||
|
||||
|
||||
def _get_oldest_possible_ts(
|
||||
hass: HomeAssistant, utc_point_in_time: datetime
|
||||
) -> float | None:
|
||||
@ -620,6 +730,7 @@ def _get_start_time_state_stmt(
|
||||
metadata_ids: list[int],
|
||||
no_attributes: bool,
|
||||
include_last_changed: bool,
|
||||
slow_dependent_subquery: bool,
|
||||
) -> Select:
|
||||
"""Return the states at a specific point in time."""
|
||||
if single_metadata_id:
|
||||
@ -634,7 +745,15 @@ def _get_start_time_state_stmt(
|
||||
)
|
||||
# We have more than one entity to look at so we need to do a query on states
|
||||
# since the last recorder run started.
|
||||
return _get_start_time_state_for_entities_stmt(
|
||||
if slow_dependent_subquery:
|
||||
return _get_start_time_state_for_entities_stmt_group_by(
|
||||
epoch_time,
|
||||
metadata_ids,
|
||||
no_attributes,
|
||||
include_last_changed,
|
||||
)
|
||||
|
||||
return _get_start_time_state_for_entities_stmt_dependent_sub_query(
|
||||
epoch_time,
|
||||
metadata_ids,
|
||||
no_attributes,
|
||||
|
@ -37,3 +37,13 @@ class DatabaseOptimizer:
|
||||
# https://wiki.postgresql.org/wiki/Loose_indexscan
|
||||
# https://github.com/home-assistant/core/issues/126084
|
||||
slow_range_in_select: bool
|
||||
|
||||
# MySQL 8.x+ can end up with a file-sort on a dependent subquery
|
||||
# which makes the query painfully slow.
|
||||
# https://github.com/home-assistant/core/issues/137178
|
||||
# The solution is to use multiple indexed group-by queries instead
|
||||
# of the subquery as long as the group by does not exceed
|
||||
# 999 elements since as soon as we hit 1000 elements MySQL
|
||||
# will no longer use the group_index_range optimization.
|
||||
# https://github.com/home-assistant/core/issues/132865#issuecomment-2543160459
|
||||
slow_dependent_subquery: bool
|
||||
|
@ -28,6 +28,7 @@ from homeassistant.helpers.recorder import DATA_RECORDER
|
||||
from homeassistant.helpers.singleton import singleton
|
||||
from homeassistant.helpers.typing import UNDEFINED, UndefinedType
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.collection import chunked_or_all
|
||||
from homeassistant.util.unit_conversion import (
|
||||
AreaConverter,
|
||||
BaseUnitConverter,
|
||||
@ -59,6 +60,7 @@ from .const import (
|
||||
INTEGRATION_PLATFORM_LIST_STATISTIC_IDS,
|
||||
INTEGRATION_PLATFORM_UPDATE_STATISTICS_ISSUES,
|
||||
INTEGRATION_PLATFORM_VALIDATE_STATISTICS,
|
||||
MAX_IDS_FOR_INDEXED_GROUP_BY,
|
||||
SupportedDialect,
|
||||
)
|
||||
from .db_schema import (
|
||||
@ -1669,6 +1671,7 @@ def _augment_result_with_change(
|
||||
drop_sum = "sum" not in _types
|
||||
prev_sums = {}
|
||||
if tmp := _statistics_at_time(
|
||||
get_instance(hass),
|
||||
session,
|
||||
{metadata[statistic_id][0] for statistic_id in result},
|
||||
table,
|
||||
@ -2027,7 +2030,39 @@ def get_latest_short_term_statistics_with_session(
|
||||
)
|
||||
|
||||
|
||||
def _generate_statistics_at_time_stmt(
|
||||
def _generate_statistics_at_time_stmt_group_by(
|
||||
table: type[StatisticsBase],
|
||||
metadata_ids: set[int],
|
||||
start_time_ts: float,
|
||||
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
|
||||
) -> StatementLambdaElement:
|
||||
"""Create the statement for finding the statistics for a given time."""
|
||||
# Simple group-by for MySQL, must use less
|
||||
# than 1000 metadata_ids in the IN clause for MySQL
|
||||
# or it will optimize poorly. Callers are responsible
|
||||
# for ensuring that the number of metadata_ids is less
|
||||
# than 1000.
|
||||
return _generate_select_columns_for_types_stmt(table, types) + (
|
||||
lambda q: q.join(
|
||||
most_recent_statistic_ids := (
|
||||
select(
|
||||
func.max(table.start_ts).label("max_start_ts"),
|
||||
table.metadata_id.label("max_metadata_id"),
|
||||
)
|
||||
.filter(table.start_ts < start_time_ts)
|
||||
.filter(table.metadata_id.in_(metadata_ids))
|
||||
.group_by(table.metadata_id)
|
||||
.subquery()
|
||||
),
|
||||
and_(
|
||||
table.start_ts == most_recent_statistic_ids.c.max_start_ts,
|
||||
table.metadata_id == most_recent_statistic_ids.c.max_metadata_id,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _generate_statistics_at_time_stmt_dependent_sub_query(
|
||||
table: type[StatisticsBase],
|
||||
metadata_ids: set[int],
|
||||
start_time_ts: float,
|
||||
@ -2041,8 +2076,7 @@ def _generate_statistics_at_time_stmt(
|
||||
# databases. Since all databases support this query as a join
|
||||
# condition we can use it as a subquery to get the last start_time_ts
|
||||
# before a specific point in time for all entities.
|
||||
stmt = _generate_select_columns_for_types_stmt(table, types)
|
||||
stmt += (
|
||||
return _generate_select_columns_for_types_stmt(table, types) + (
|
||||
lambda q: q.select_from(StatisticsMeta)
|
||||
.join(
|
||||
table,
|
||||
@ -2064,10 +2098,10 @@ def _generate_statistics_at_time_stmt(
|
||||
)
|
||||
.where(table.metadata_id.in_(metadata_ids))
|
||||
)
|
||||
return stmt
|
||||
|
||||
|
||||
def _statistics_at_time(
|
||||
instance: Recorder,
|
||||
session: Session,
|
||||
metadata_ids: set[int],
|
||||
table: type[StatisticsBase],
|
||||
@ -2076,8 +2110,41 @@ def _statistics_at_time(
|
||||
) -> Sequence[Row] | None:
|
||||
"""Return last known statistics, earlier than start_time, for the metadata_ids."""
|
||||
start_time_ts = start_time.timestamp()
|
||||
stmt = _generate_statistics_at_time_stmt(table, metadata_ids, start_time_ts, types)
|
||||
return cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
|
||||
if TYPE_CHECKING:
|
||||
assert instance.database_engine is not None
|
||||
if not instance.database_engine.optimizer.slow_dependent_subquery:
|
||||
stmt = _generate_statistics_at_time_stmt_dependent_sub_query(
|
||||
table=table,
|
||||
metadata_ids=metadata_ids,
|
||||
start_time_ts=start_time_ts,
|
||||
types=types,
|
||||
)
|
||||
return cast(list[Row], execute_stmt_lambda_element(session, stmt))
|
||||
rows: list[Row] = []
|
||||
# https://github.com/home-assistant/core/issues/132865
|
||||
# If we include the start time state we need to limit the
|
||||
# number of metadata_ids we query for at a time to avoid
|
||||
# hitting limits in the MySQL optimizer that prevent
|
||||
# the start time state query from using an index-only optimization
|
||||
# to find the start time state.
|
||||
for metadata_ids_chunk in chunked_or_all(
|
||||
metadata_ids, MAX_IDS_FOR_INDEXED_GROUP_BY
|
||||
):
|
||||
stmt = _generate_statistics_at_time_stmt_group_by(
|
||||
table=table,
|
||||
metadata_ids=metadata_ids_chunk,
|
||||
start_time_ts=start_time_ts,
|
||||
types=types,
|
||||
)
|
||||
row_chunk = cast(list[Row], execute_stmt_lambda_element(session, stmt))
|
||||
if rows:
|
||||
rows += row_chunk
|
||||
else:
|
||||
# If we have no rows yet, we can just assign the chunk
|
||||
# as this is the common case since its rare that
|
||||
# we exceed the MAX_IDS_FOR_INDEXED_GROUP_BY limit
|
||||
rows = row_chunk
|
||||
return rows
|
||||
|
||||
|
||||
def _build_sum_converted_stats(
|
||||
|
@ -464,6 +464,7 @@ def setup_connection_for_dialect(
|
||||
"""Execute statements needed for dialect connection."""
|
||||
version: AwesomeVersion | None = None
|
||||
slow_range_in_select = False
|
||||
slow_dependent_subquery = False
|
||||
if dialect_name == SupportedDialect.SQLITE:
|
||||
if first_connection:
|
||||
old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined]
|
||||
@ -505,9 +506,8 @@ def setup_connection_for_dialect(
|
||||
result = query_on_connection(dbapi_connection, "SELECT VERSION()")
|
||||
version_string = result[0][0]
|
||||
version = _extract_version_from_server_response(version_string)
|
||||
is_maria_db = "mariadb" in version_string.lower()
|
||||
|
||||
if is_maria_db:
|
||||
if "mariadb" in version_string.lower():
|
||||
if not version or version < MIN_VERSION_MARIA_DB:
|
||||
_raise_if_version_unsupported(
|
||||
version or version_string, "MariaDB", MIN_VERSION_MARIA_DB
|
||||
@ -523,19 +523,21 @@ def setup_connection_for_dialect(
|
||||
instance.hass,
|
||||
version,
|
||||
)
|
||||
|
||||
slow_range_in_select = bool(
|
||||
not version
|
||||
or version < MARIADB_WITH_FIXED_IN_QUERIES_105
|
||||
or MARIA_DB_106 <= version < MARIADB_WITH_FIXED_IN_QUERIES_106
|
||||
or MARIA_DB_107 <= version < MARIADB_WITH_FIXED_IN_QUERIES_107
|
||||
or MARIA_DB_108 <= version < MARIADB_WITH_FIXED_IN_QUERIES_108
|
||||
)
|
||||
elif not version or version < MIN_VERSION_MYSQL:
|
||||
_raise_if_version_unsupported(
|
||||
version or version_string, "MySQL", MIN_VERSION_MYSQL
|
||||
)
|
||||
|
||||
slow_range_in_select = bool(
|
||||
not version
|
||||
or version < MARIADB_WITH_FIXED_IN_QUERIES_105
|
||||
or MARIA_DB_106 <= version < MARIADB_WITH_FIXED_IN_QUERIES_106
|
||||
or MARIA_DB_107 <= version < MARIADB_WITH_FIXED_IN_QUERIES_107
|
||||
or MARIA_DB_108 <= version < MARIADB_WITH_FIXED_IN_QUERIES_108
|
||||
)
|
||||
else:
|
||||
# MySQL
|
||||
# https://github.com/home-assistant/core/issues/137178
|
||||
slow_dependent_subquery = True
|
||||
|
||||
# Ensure all times are using UTC to avoid issues with daylight savings
|
||||
execute_on_connection(dbapi_connection, "SET time_zone = '+00:00'")
|
||||
@ -565,7 +567,10 @@ def setup_connection_for_dialect(
|
||||
return DatabaseEngine(
|
||||
dialect=SupportedDialect(dialect_name),
|
||||
version=version,
|
||||
optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select),
|
||||
optimizer=DatabaseOptimizer(
|
||||
slow_range_in_select=slow_range_in_select,
|
||||
slow_dependent_subquery=slow_dependent_subquery,
|
||||
),
|
||||
max_bind_vars=DEFAULT_MAX_BIND_VARS,
|
||||
)
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
"""The tests the History component websocket_api."""
|
||||
|
||||
from collections.abc import Generator
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import recorder
|
||||
@ -17,9 +19,9 @@ from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def db_schema_32():
|
||||
def db_schema_32(hass: HomeAssistant) -> Generator[None]:
|
||||
"""Fixture to initialize the db with the old schema 32."""
|
||||
with old_db_schema("32"):
|
||||
with old_db_schema(hass, "32"):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -15,7 +15,7 @@ from typing import Any, Literal, cast
|
||||
from unittest.mock import MagicMock, patch, sentinel
|
||||
|
||||
from freezegun import freeze_time
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy import create_engine, event as sqlalchemy_event
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from homeassistant import core as ha
|
||||
@ -414,7 +414,15 @@ def create_engine_test_for_schema_version_postfix(
|
||||
schema_module = get_schema_module_path(schema_version_postfix)
|
||||
importlib.import_module(schema_module)
|
||||
old_db_schema = sys.modules[schema_module]
|
||||
instance: Recorder | None = None
|
||||
if "hass" in kwargs:
|
||||
hass: HomeAssistant = kwargs.pop("hass")
|
||||
instance = recorder.get_instance(hass)
|
||||
engine = create_engine(*args, **kwargs)
|
||||
if instance is not None:
|
||||
instance = recorder.get_instance(hass)
|
||||
instance.engine = engine
|
||||
sqlalchemy_event.listen(engine, "connect", instance._setup_recorder_connection)
|
||||
old_db_schema.Base.metadata.create_all(engine)
|
||||
with Session(engine) as session:
|
||||
session.add(
|
||||
@ -435,7 +443,7 @@ def get_schema_module_path(schema_version_postfix: str) -> str:
|
||||
|
||||
|
||||
@contextmanager
|
||||
def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
|
||||
def old_db_schema(hass: HomeAssistant, schema_version_postfix: str) -> Iterator[None]:
|
||||
"""Fixture to initialize the db with the old schema."""
|
||||
schema_module = get_schema_module_path(schema_version_postfix)
|
||||
importlib.import_module(schema_module)
|
||||
@ -455,6 +463,7 @@ def old_db_schema(schema_version_postfix: str) -> Iterator[None]:
|
||||
CREATE_ENGINE_TARGET,
|
||||
new=partial(
|
||||
create_engine_test_for_schema_version_postfix,
|
||||
hass=hass,
|
||||
schema_version_postfix=schema_version_postfix,
|
||||
),
|
||||
),
|
||||
|
@ -13,6 +13,7 @@ from sqlalchemy.orm.session import Session
|
||||
|
||||
from homeassistant.components import recorder
|
||||
from homeassistant.components.recorder import db_schema
|
||||
from homeassistant.components.recorder.const import MAX_IDS_FOR_INDEXED_GROUP_BY
|
||||
from homeassistant.components.recorder.util import session_scope
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
@ -190,3 +191,9 @@ def instrument_migration(
|
||||
instrumented_migration.live_migration_done_stall.set()
|
||||
instrumented_migration.non_live_migration_done_stall.set()
|
||||
yield instrumented_migration
|
||||
|
||||
|
||||
@pytest.fixture(params=[1, 2, MAX_IDS_FOR_INDEXED_GROUP_BY])
|
||||
def ids_for_start_time_chunk_sizes(request: pytest.FixtureRequest) -> int:
|
||||
"""Fixture to test different chunk sizes for start time query."""
|
||||
return request.param
|
||||
|
@ -1,6 +1,6 @@
|
||||
"""The tests for the recorder filter matching the EntityFilter component."""
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
from collections.abc import AsyncGenerator, Generator
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
@ -32,12 +32,21 @@ from homeassistant.helpers.entityfilter import (
|
||||
|
||||
from .common import async_wait_recording_done, old_db_schema
|
||||
|
||||
from tests.typing import RecorderInstanceContextManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_recorder_before_hass(
|
||||
async_test_recorder: RecorderInstanceContextManager,
|
||||
) -> None:
|
||||
"""Set up recorder."""
|
||||
|
||||
|
||||
# This test is for schema 37 and below (32 is new enough to test)
|
||||
@pytest.fixture(autouse=True)
|
||||
def db_schema_32():
|
||||
def db_schema_32(hass: HomeAssistant) -> Generator[None]:
|
||||
"""Fixture to initialize the db with the old schema 32."""
|
||||
with old_db_schema("32"):
|
||||
with old_db_schema(hass, "32"):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -2,10 +2,11 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from copy import copy
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
from unittest.mock import sentinel
|
||||
from unittest.mock import patch, sentinel
|
||||
|
||||
from freezegun import freeze_time
|
||||
import pytest
|
||||
@ -36,6 +37,24 @@ from .common import (
|
||||
from tests.typing import RecorderInstanceContextManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def multiple_start_time_chunk_sizes(
|
||||
ids_for_start_time_chunk_sizes: int,
|
||||
) -> Generator[None]:
|
||||
"""Fixture to test different chunk sizes for start time query.
|
||||
|
||||
Force the recorder to use different chunk sizes for start time query.
|
||||
|
||||
In effect this forces get_significant_states_with_session
|
||||
to call _generate_significant_states_with_session_stmt multiple times.
|
||||
"""
|
||||
with patch(
|
||||
"homeassistant.components.recorder.history.modern.MAX_IDS_FOR_INDEXED_GROUP_BY",
|
||||
ids_for_start_time_chunk_sizes,
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_recorder_before_hass(
|
||||
async_test_recorder: RecorderInstanceContextManager,
|
||||
@ -429,6 +448,7 @@ async def test_ensure_state_can_be_copied(
|
||||
assert_states_equal_without_context(copy(hist[entity_id][1]), hist[entity_id][1])
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
async def test_get_significant_states(hass: HomeAssistant) -> None:
|
||||
"""Test that only significant states are returned.
|
||||
|
||||
@ -443,6 +463,7 @@ async def test_get_significant_states(hass: HomeAssistant) -> None:
|
||||
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
async def test_get_significant_states_minimal_response(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
@ -512,6 +533,7 @@ async def test_get_significant_states_minimal_response(
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("time_zone", ["Europe/Berlin", "US/Hawaii", "UTC"])
|
||||
async def test_get_significant_states_with_initial(
|
||||
time_zone, hass: HomeAssistant
|
||||
@ -544,6 +566,7 @@ async def test_get_significant_states_with_initial(
|
||||
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
async def test_get_significant_states_without_initial(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
@ -578,6 +601,7 @@ async def test_get_significant_states_without_initial(
|
||||
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
async def test_get_significant_states_entity_id(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
@ -596,6 +620,7 @@ async def test_get_significant_states_entity_id(
|
||||
assert_dict_of_states_equal_without_context_and_last_changed(states, hist)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
async def test_get_significant_states_multiple_entity_ids(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from copy import copy
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
@ -50,9 +51,9 @@ def disable_states_meta_manager():
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def db_schema_32():
|
||||
def db_schema_32(hass: HomeAssistant) -> Generator[None]:
|
||||
"""Fixture to initialize the db with the old schema 32."""
|
||||
with old_db_schema("32"):
|
||||
with old_db_schema(hass, "32"):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Generator
|
||||
from copy import copy
|
||||
from datetime import datetime, timedelta
|
||||
import json
|
||||
@ -42,9 +43,9 @@ async def mock_recorder_before_hass(
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def db_schema_42():
|
||||
def db_schema_42(hass: HomeAssistant) -> Generator[None]:
|
||||
"""Fixture to initialize the db with the old schema 42."""
|
||||
with old_db_schema("42"):
|
||||
with old_db_schema(hass, "42"):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -58,9 +58,9 @@ async def mock_recorder_before_hass(
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def db_schema_32():
|
||||
def db_schema_32(hass: HomeAssistant) -> Generator[None]:
|
||||
"""Fixture to initialize the db with the old schema 32."""
|
||||
with old_db_schema("32"):
|
||||
with old_db_schema(hass, "32"):
|
||||
yield
|
||||
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""The tests for sensor recorder platform."""
|
||||
|
||||
from collections.abc import Generator
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, Mock, patch
|
||||
@ -18,7 +19,8 @@ from homeassistant.components.recorder.statistics import (
|
||||
STATISTIC_UNIT_TO_UNIT_CONVERTER,
|
||||
PlatformCompiledStatistics,
|
||||
_generate_max_mean_min_statistic_in_sub_period_stmt,
|
||||
_generate_statistics_at_time_stmt,
|
||||
_generate_statistics_at_time_stmt_dependent_sub_query,
|
||||
_generate_statistics_at_time_stmt_group_by,
|
||||
_generate_statistics_during_period_stmt,
|
||||
async_add_external_statistics,
|
||||
async_import_statistics,
|
||||
@ -57,6 +59,24 @@ from tests.common import MockPlatform, mock_platform
|
||||
from tests.typing import RecorderInstanceContextManager, WebSocketGenerator
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def multiple_start_time_chunk_sizes(
|
||||
ids_for_start_time_chunk_sizes: int,
|
||||
) -> Generator[None]:
|
||||
"""Fixture to test different chunk sizes for start time query.
|
||||
|
||||
Force the statistics query to use different chunk sizes for start time query.
|
||||
|
||||
In effect this forces _statistics_at_time
|
||||
to call _generate_statistics_at_time_stmt_group_by multiple times.
|
||||
"""
|
||||
with patch(
|
||||
"homeassistant.components.recorder.statistics.MAX_IDS_FOR_INDEXED_GROUP_BY",
|
||||
ids_for_start_time_chunk_sizes,
|
||||
):
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_recorder_before_hass(
|
||||
async_test_recorder: RecorderInstanceContextManager,
|
||||
@ -1113,6 +1133,7 @@ async def test_import_statistics_errors(
|
||||
assert get_metadata(hass, statistic_ids={"sensor.total_energy_import"}) == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_daily_statistics_sum(
|
||||
@ -1293,6 +1314,215 @@ async def test_daily_statistics_sum(
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_multiple_daily_statistics_sum(
|
||||
hass: HomeAssistant,
|
||||
setup_recorder: None,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
timezone,
|
||||
) -> None:
|
||||
"""Test daily statistics."""
|
||||
await hass.config.async_set_time_zone(timezone)
|
||||
await async_wait_recording_done(hass)
|
||||
assert "Compiling statistics for" not in caplog.text
|
||||
assert "Statistics already compiled" not in caplog.text
|
||||
|
||||
zero = dt_util.utcnow()
|
||||
period1 = dt_util.as_utc(dt_util.parse_datetime("2022-10-03 00:00:00"))
|
||||
period2 = dt_util.as_utc(dt_util.parse_datetime("2022-10-03 23:00:00"))
|
||||
period3 = dt_util.as_utc(dt_util.parse_datetime("2022-10-04 00:00:00"))
|
||||
period4 = dt_util.as_utc(dt_util.parse_datetime("2022-10-04 23:00:00"))
|
||||
period5 = dt_util.as_utc(dt_util.parse_datetime("2022-10-05 00:00:00"))
|
||||
period6 = dt_util.as_utc(dt_util.parse_datetime("2022-10-05 23:00:00"))
|
||||
|
||||
external_statistics = (
|
||||
{
|
||||
"start": period1,
|
||||
"last_reset": None,
|
||||
"state": 0,
|
||||
"sum": 2,
|
||||
},
|
||||
{
|
||||
"start": period2,
|
||||
"last_reset": None,
|
||||
"state": 1,
|
||||
"sum": 3,
|
||||
},
|
||||
{
|
||||
"start": period3,
|
||||
"last_reset": None,
|
||||
"state": 2,
|
||||
"sum": 4,
|
||||
},
|
||||
{
|
||||
"start": period4,
|
||||
"last_reset": None,
|
||||
"state": 3,
|
||||
"sum": 5,
|
||||
},
|
||||
{
|
||||
"start": period5,
|
||||
"last_reset": None,
|
||||
"state": 4,
|
||||
"sum": 6,
|
||||
},
|
||||
{
|
||||
"start": period6,
|
||||
"last_reset": None,
|
||||
"state": 5,
|
||||
"sum": 7,
|
||||
},
|
||||
)
|
||||
external_metadata1 = {
|
||||
"has_mean": False,
|
||||
"has_sum": True,
|
||||
"name": "Total imported energy 1",
|
||||
"source": "test",
|
||||
"statistic_id": "test:total_energy_import2",
|
||||
"unit_of_measurement": "kWh",
|
||||
}
|
||||
external_metadata2 = {
|
||||
"has_mean": False,
|
||||
"has_sum": True,
|
||||
"name": "Total imported energy 2",
|
||||
"source": "test",
|
||||
"statistic_id": "test:total_energy_import1",
|
||||
"unit_of_measurement": "kWh",
|
||||
}
|
||||
|
||||
async_add_external_statistics(hass, external_metadata1, external_statistics)
|
||||
async_add_external_statistics(hass, external_metadata2, external_statistics)
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
zero,
|
||||
period="day",
|
||||
statistic_ids={"test:total_energy_import1", "test:total_energy_import2"},
|
||||
)
|
||||
day1_start = dt_util.as_utc(dt_util.parse_datetime("2022-10-03 00:00:00"))
|
||||
day1_end = dt_util.as_utc(dt_util.parse_datetime("2022-10-04 00:00:00"))
|
||||
day2_start = dt_util.as_utc(dt_util.parse_datetime("2022-10-04 00:00:00"))
|
||||
day2_end = dt_util.as_utc(dt_util.parse_datetime("2022-10-05 00:00:00"))
|
||||
day3_start = dt_util.as_utc(dt_util.parse_datetime("2022-10-05 00:00:00"))
|
||||
day3_end = dt_util.as_utc(dt_util.parse_datetime("2022-10-06 00:00:00"))
|
||||
expected_stats_inner = [
|
||||
{
|
||||
"start": day1_start.timestamp(),
|
||||
"end": day1_end.timestamp(),
|
||||
"last_reset": None,
|
||||
"state": 1.0,
|
||||
"sum": 3.0,
|
||||
},
|
||||
{
|
||||
"start": day2_start.timestamp(),
|
||||
"end": day2_end.timestamp(),
|
||||
"last_reset": None,
|
||||
"state": 3.0,
|
||||
"sum": 5.0,
|
||||
},
|
||||
{
|
||||
"start": day3_start.timestamp(),
|
||||
"end": day3_end.timestamp(),
|
||||
"last_reset": None,
|
||||
"state": 5.0,
|
||||
"sum": 7.0,
|
||||
},
|
||||
]
|
||||
expected_stats = {
|
||||
"test:total_energy_import1": expected_stats_inner,
|
||||
"test:total_energy_import2": expected_stats_inner,
|
||||
}
|
||||
assert stats == expected_stats
|
||||
|
||||
# Get change
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=period1,
|
||||
statistic_ids={"test:total_energy_import1", "test:total_energy_import2"},
|
||||
period="day",
|
||||
types={"change"},
|
||||
)
|
||||
expected_inner = [
|
||||
{
|
||||
"start": day1_start.timestamp(),
|
||||
"end": day1_end.timestamp(),
|
||||
"change": 3.0,
|
||||
},
|
||||
{
|
||||
"start": day2_start.timestamp(),
|
||||
"end": day2_end.timestamp(),
|
||||
"change": 2.0,
|
||||
},
|
||||
{
|
||||
"start": day3_start.timestamp(),
|
||||
"end": day3_end.timestamp(),
|
||||
"change": 2.0,
|
||||
},
|
||||
]
|
||||
assert stats == {
|
||||
"test:total_energy_import1": expected_inner,
|
||||
"test:total_energy_import2": expected_inner,
|
||||
}
|
||||
|
||||
# Get data with start during the first period
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=period1 + timedelta(hours=1),
|
||||
statistic_ids={"test:total_energy_import1", "test:total_energy_import2"},
|
||||
period="day",
|
||||
)
|
||||
assert stats == expected_stats
|
||||
|
||||
# Get data with end during the third period
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=zero,
|
||||
end_time=period6 - timedelta(hours=1),
|
||||
statistic_ids={"test:total_energy_import1", "test:total_energy_import2"},
|
||||
period="day",
|
||||
)
|
||||
assert stats == expected_stats
|
||||
|
||||
# Try to get data for entities which do not exist
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=zero,
|
||||
statistic_ids={
|
||||
"not",
|
||||
"the",
|
||||
"same",
|
||||
"test:total_energy_import1",
|
||||
"test:total_energy_import2",
|
||||
},
|
||||
period="day",
|
||||
)
|
||||
assert stats == expected_stats
|
||||
|
||||
# Use 5minute to ensure table switch works
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=zero,
|
||||
statistic_ids=[
|
||||
"test:total_energy_import1",
|
||||
"with_other",
|
||||
"test:total_energy_import2",
|
||||
],
|
||||
period="5minute",
|
||||
)
|
||||
assert stats == {}
|
||||
|
||||
# Ensure future date has not data
|
||||
future = dt_util.as_utc(dt_util.parse_datetime("2221-11-01 00:00:00"))
|
||||
stats = statistics_during_period(
|
||||
hass, start_time=future, end_time=future, period="day"
|
||||
)
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_weekly_statistics_mean(
|
||||
@ -1428,6 +1658,7 @@ async def test_weekly_statistics_mean(
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_weekly_statistics_sum(
|
||||
@ -1608,6 +1839,7 @@ async def test_weekly_statistics_sum(
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2021-08-01 00:00:00+00:00")
|
||||
async def test_monthly_statistics_sum(
|
||||
@ -1914,20 +2146,43 @@ def test_cache_key_for_generate_max_mean_min_statistic_in_sub_period_stmt() -> N
|
||||
assert cache_key_1 != cache_key_3
|
||||
|
||||
|
||||
def test_cache_key_for_generate_statistics_at_time_stmt() -> None:
|
||||
"""Test cache key for _generate_statistics_at_time_stmt."""
|
||||
stmt = _generate_statistics_at_time_stmt(StatisticsShortTerm, {0}, 0.0, set())
|
||||
def test_cache_key_for_generate_statistics_at_time_stmt_group_by() -> None:
|
||||
"""Test cache key for _generate_statistics_at_time_stmt_group_by."""
|
||||
stmt = _generate_statistics_at_time_stmt_group_by(
|
||||
StatisticsShortTerm, {0}, 0.0, set()
|
||||
)
|
||||
cache_key_1 = stmt._generate_cache_key()
|
||||
stmt2 = _generate_statistics_at_time_stmt(StatisticsShortTerm, {0}, 0.0, set())
|
||||
stmt2 = _generate_statistics_at_time_stmt_group_by(
|
||||
StatisticsShortTerm, {0}, 0.0, set()
|
||||
)
|
||||
cache_key_2 = stmt2._generate_cache_key()
|
||||
assert cache_key_1 == cache_key_2
|
||||
stmt3 = _generate_statistics_at_time_stmt(
|
||||
stmt3 = _generate_statistics_at_time_stmt_group_by(
|
||||
StatisticsShortTerm, {0}, 0.0, {"sum", "mean"}
|
||||
)
|
||||
cache_key_3 = stmt3._generate_cache_key()
|
||||
assert cache_key_1 != cache_key_3
|
||||
|
||||
|
||||
def test_cache_key_for_generate_statistics_at_time_stmt_dependent_sub_query() -> None:
|
||||
"""Test cache key for _generate_statistics_at_time_stmt_dependent_sub_query."""
|
||||
stmt = _generate_statistics_at_time_stmt_dependent_sub_query(
|
||||
StatisticsShortTerm, {0}, 0.0, set()
|
||||
)
|
||||
cache_key_1 = stmt._generate_cache_key()
|
||||
stmt2 = _generate_statistics_at_time_stmt_dependent_sub_query(
|
||||
StatisticsShortTerm, {0}, 0.0, set()
|
||||
)
|
||||
cache_key_2 = stmt2._generate_cache_key()
|
||||
assert cache_key_1 == cache_key_2
|
||||
stmt3 = _generate_statistics_at_time_stmt_dependent_sub_query(
|
||||
StatisticsShortTerm, {0}, 0.0, {"sum", "mean"}
|
||||
)
|
||||
cache_key_3 = stmt3._generate_cache_key()
|
||||
assert cache_key_1 != cache_key_3
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_change(
|
||||
@ -2263,6 +2518,392 @@ async def test_change(
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_change_multiple(
|
||||
hass: HomeAssistant,
|
||||
setup_recorder: None,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
timezone,
|
||||
) -> None:
|
||||
"""Test deriving change from sum statistic."""
|
||||
await hass.config.async_set_time_zone(timezone)
|
||||
await async_wait_recording_done(hass)
|
||||
assert "Compiling statistics for" not in caplog.text
|
||||
assert "Statistics already compiled" not in caplog.text
|
||||
|
||||
zero = dt_util.utcnow()
|
||||
period1 = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 00:00:00"))
|
||||
period2 = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 01:00:00"))
|
||||
period3 = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 02:00:00"))
|
||||
period4 = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 03:00:00"))
|
||||
|
||||
external_statistics = (
|
||||
{
|
||||
"start": period1,
|
||||
"last_reset": None,
|
||||
"state": 0,
|
||||
"sum": 2,
|
||||
},
|
||||
{
|
||||
"start": period2,
|
||||
"last_reset": None,
|
||||
"state": 1,
|
||||
"sum": 3,
|
||||
},
|
||||
{
|
||||
"start": period3,
|
||||
"last_reset": None,
|
||||
"state": 2,
|
||||
"sum": 5,
|
||||
},
|
||||
{
|
||||
"start": period4,
|
||||
"last_reset": None,
|
||||
"state": 3,
|
||||
"sum": 8,
|
||||
},
|
||||
)
|
||||
external_metadata1 = {
|
||||
"has_mean": False,
|
||||
"has_sum": True,
|
||||
"name": "Total imported energy",
|
||||
"source": "recorder",
|
||||
"statistic_id": "sensor.total_energy_import1",
|
||||
"unit_of_measurement": "kWh",
|
||||
}
|
||||
external_metadata2 = {
|
||||
"has_mean": False,
|
||||
"has_sum": True,
|
||||
"name": "Total imported energy",
|
||||
"source": "recorder",
|
||||
"statistic_id": "sensor.total_energy_import2",
|
||||
"unit_of_measurement": "kWh",
|
||||
}
|
||||
async_import_statistics(hass, external_metadata1, external_statistics)
|
||||
async_import_statistics(hass, external_metadata2, external_statistics)
|
||||
await async_wait_recording_done(hass)
|
||||
# Get change from far in the past
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
zero,
|
||||
period="hour",
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
types={"change"},
|
||||
)
|
||||
hour1_start = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 00:00:00"))
|
||||
hour1_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 01:00:00"))
|
||||
hour2_start = hour1_end
|
||||
hour2_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 02:00:00"))
|
||||
hour3_start = hour2_end
|
||||
hour3_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 03:00:00"))
|
||||
hour4_start = hour3_end
|
||||
hour4_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 04:00:00"))
|
||||
expected_inner = [
|
||||
{
|
||||
"start": hour1_start.timestamp(),
|
||||
"end": hour1_end.timestamp(),
|
||||
"change": 2.0,
|
||||
},
|
||||
{
|
||||
"start": hour2_start.timestamp(),
|
||||
"end": hour2_end.timestamp(),
|
||||
"change": 1.0,
|
||||
},
|
||||
{
|
||||
"start": hour3_start.timestamp(),
|
||||
"end": hour3_end.timestamp(),
|
||||
"change": 2.0,
|
||||
},
|
||||
{
|
||||
"start": hour4_start.timestamp(),
|
||||
"end": hour4_end.timestamp(),
|
||||
"change": 3.0,
|
||||
},
|
||||
]
|
||||
expected_stats = {
|
||||
"sensor.total_energy_import1": expected_inner,
|
||||
"sensor.total_energy_import2": expected_inner,
|
||||
}
|
||||
assert stats == expected_stats
|
||||
|
||||
# Get change + sum from far in the past
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
zero,
|
||||
period="hour",
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
types={"change", "sum"},
|
||||
)
|
||||
hour1_start = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 00:00:00"))
|
||||
hour1_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 01:00:00"))
|
||||
hour2_start = hour1_end
|
||||
hour2_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 02:00:00"))
|
||||
hour3_start = hour2_end
|
||||
hour3_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 03:00:00"))
|
||||
hour4_start = hour3_end
|
||||
hour4_end = dt_util.as_utc(dt_util.parse_datetime("2023-05-08 04:00:00"))
|
||||
expected_inner = [
|
||||
{
|
||||
"start": hour1_start.timestamp(),
|
||||
"end": hour1_end.timestamp(),
|
||||
"change": 2.0,
|
||||
"sum": 2.0,
|
||||
},
|
||||
{
|
||||
"start": hour2_start.timestamp(),
|
||||
"end": hour2_end.timestamp(),
|
||||
"change": 1.0,
|
||||
"sum": 3.0,
|
||||
},
|
||||
{
|
||||
"start": hour3_start.timestamp(),
|
||||
"end": hour3_end.timestamp(),
|
||||
"change": 2.0,
|
||||
"sum": 5.0,
|
||||
},
|
||||
{
|
||||
"start": hour4_start.timestamp(),
|
||||
"end": hour4_end.timestamp(),
|
||||
"change": 3.0,
|
||||
"sum": 8.0,
|
||||
},
|
||||
]
|
||||
expected_stats_change_sum = {
|
||||
"sensor.total_energy_import1": expected_inner,
|
||||
"sensor.total_energy_import2": expected_inner,
|
||||
}
|
||||
assert stats == expected_stats_change_sum
|
||||
|
||||
# Get change from far in the past with unit conversion
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour1_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
units={"energy": "Wh"},
|
||||
)
|
||||
expected_inner = [
|
||||
{
|
||||
"start": hour1_start.timestamp(),
|
||||
"end": hour1_end.timestamp(),
|
||||
"change": 2.0 * 1000,
|
||||
},
|
||||
{
|
||||
"start": hour2_start.timestamp(),
|
||||
"end": hour2_end.timestamp(),
|
||||
"change": 1.0 * 1000,
|
||||
},
|
||||
{
|
||||
"start": hour3_start.timestamp(),
|
||||
"end": hour3_end.timestamp(),
|
||||
"change": 2.0 * 1000,
|
||||
},
|
||||
{
|
||||
"start": hour4_start.timestamp(),
|
||||
"end": hour4_end.timestamp(),
|
||||
"change": 3.0 * 1000,
|
||||
},
|
||||
]
|
||||
expected_stats_wh = {
|
||||
"sensor.total_energy_import1": expected_inner,
|
||||
"sensor.total_energy_import2": expected_inner,
|
||||
}
|
||||
assert stats == expected_stats_wh
|
||||
|
||||
# Get change from far in the past with implicit unit conversion
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import1", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import2", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour1_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
expected_inner = [
|
||||
{
|
||||
"start": hour1_start.timestamp(),
|
||||
"end": hour1_end.timestamp(),
|
||||
"change": 2.0 / 1000,
|
||||
},
|
||||
{
|
||||
"start": hour2_start.timestamp(),
|
||||
"end": hour2_end.timestamp(),
|
||||
"change": 1.0 / 1000,
|
||||
},
|
||||
{
|
||||
"start": hour3_start.timestamp(),
|
||||
"end": hour3_end.timestamp(),
|
||||
"change": 2.0 / 1000,
|
||||
},
|
||||
{
|
||||
"start": hour4_start.timestamp(),
|
||||
"end": hour4_end.timestamp(),
|
||||
"change": 3.0 / 1000,
|
||||
},
|
||||
]
|
||||
expected_stats_mwh = {
|
||||
"sensor.total_energy_import1": expected_inner,
|
||||
"sensor.total_energy_import2": expected_inner,
|
||||
}
|
||||
assert stats == expected_stats_mwh
|
||||
hass.states.async_remove("sensor.total_energy_import1")
|
||||
hass.states.async_remove("sensor.total_energy_import2")
|
||||
|
||||
# Get change from the first recorded hour
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour1_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == expected_stats
|
||||
|
||||
# Get change from the first recorded hour with unit conversion
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour1_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
units={"energy": "Wh"},
|
||||
)
|
||||
assert stats == expected_stats_wh
|
||||
|
||||
# Get change from the first recorded hour with implicit unit conversion
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import1", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import2", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour1_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == expected_stats_mwh
|
||||
hass.states.async_remove("sensor.total_energy_import1")
|
||||
hass.states.async_remove("sensor.total_energy_import2")
|
||||
|
||||
# Get change from the second recorded hour
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour2_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == {
|
||||
"sensor.total_energy_import1": expected_stats["sensor.total_energy_import1"][
|
||||
1:4
|
||||
],
|
||||
"sensor.total_energy_import2": expected_stats["sensor.total_energy_import2"][
|
||||
1:4
|
||||
],
|
||||
}
|
||||
|
||||
# Get change from the second recorded hour with unit conversion
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour2_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
units={"energy": "Wh"},
|
||||
)
|
||||
assert stats == {
|
||||
"sensor.total_energy_import1": expected_stats_wh["sensor.total_energy_import1"][
|
||||
1:4
|
||||
],
|
||||
"sensor.total_energy_import2": expected_stats_wh["sensor.total_energy_import2"][
|
||||
1:4
|
||||
],
|
||||
}
|
||||
|
||||
# Get change from the second recorded hour with implicit unit conversion
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import1", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
hass.states.async_set(
|
||||
"sensor.total_energy_import2", "unknown", {"unit_of_measurement": "MWh"}
|
||||
)
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour2_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == {
|
||||
"sensor.total_energy_import1": expected_stats_mwh[
|
||||
"sensor.total_energy_import1"
|
||||
][1:4],
|
||||
"sensor.total_energy_import2": expected_stats_mwh[
|
||||
"sensor.total_energy_import2"
|
||||
][1:4],
|
||||
}
|
||||
hass.states.async_remove("sensor.total_energy_import1")
|
||||
hass.states.async_remove("sensor.total_energy_import2")
|
||||
|
||||
# Get change from the second until the third recorded hour
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour2_start,
|
||||
end_time=hour4_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == {
|
||||
"sensor.total_energy_import1": expected_stats["sensor.total_energy_import1"][
|
||||
1:3
|
||||
],
|
||||
"sensor.total_energy_import2": expected_stats["sensor.total_energy_import2"][
|
||||
1:3
|
||||
],
|
||||
}
|
||||
|
||||
# Get change from the fourth recorded hour
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=hour4_start,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == {
|
||||
"sensor.total_energy_import1": expected_stats["sensor.total_energy_import1"][
|
||||
3:4
|
||||
],
|
||||
"sensor.total_energy_import2": expected_stats["sensor.total_energy_import2"][
|
||||
3:4
|
||||
],
|
||||
}
|
||||
|
||||
# Test change with a far future start date
|
||||
future = dt_util.as_utc(dt_util.parse_datetime("2221-11-01 00:00:00"))
|
||||
stats = statistics_during_period(
|
||||
hass,
|
||||
start_time=future,
|
||||
statistic_ids={"sensor.total_energy_import1", "sensor.total_energy_import2"},
|
||||
period="hour",
|
||||
types={"change"},
|
||||
)
|
||||
assert stats == {}
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("multiple_start_time_chunk_sizes")
|
||||
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
|
||||
@pytest.mark.freeze_time("2022-10-01 00:00:00+00:00")
|
||||
async def test_change_with_none(
|
||||
|
@ -417,7 +417,12 @@ def test_supported_mysql(caplog: pytest.LogCaptureFixture, mysql_version) -> Non
|
||||
|
||||
dbapi_connection = MagicMock(cursor=_make_cursor_mock)
|
||||
|
||||
util.setup_connection_for_dialect(instance_mock, "mysql", dbapi_connection, True)
|
||||
database_engine = util.setup_connection_for_dialect(
|
||||
instance_mock, "mysql", dbapi_connection, True
|
||||
)
|
||||
assert database_engine is not None
|
||||
assert database_engine.optimizer.slow_range_in_select is False
|
||||
assert database_engine.optimizer.slow_dependent_subquery is True
|
||||
|
||||
assert "minimum supported version" not in caplog.text
|
||||
|
||||
@ -502,6 +507,7 @@ def test_supported_pgsql(caplog: pytest.LogCaptureFixture, pgsql_version) -> Non
|
||||
assert "minimum supported version" not in caplog.text
|
||||
assert database_engine is not None
|
||||
assert database_engine.optimizer.slow_range_in_select is True
|
||||
assert database_engine.optimizer.slow_dependent_subquery is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -583,6 +589,7 @@ def test_supported_sqlite(caplog: pytest.LogCaptureFixture, sqlite_version) -> N
|
||||
assert "minimum supported version" not in caplog.text
|
||||
assert database_engine is not None
|
||||
assert database_engine.optimizer.slow_range_in_select is False
|
||||
assert database_engine.optimizer.slow_dependent_subquery is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -675,6 +682,7 @@ async def test_issue_for_mariadb_with_MDEV_25020(
|
||||
|
||||
assert database_engine is not None
|
||||
assert database_engine.optimizer.slow_range_in_select is True
|
||||
assert database_engine.optimizer.slow_dependent_subquery is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@ -731,6 +739,7 @@ async def test_no_issue_for_mariadb_with_MDEV_25020(
|
||||
|
||||
assert database_engine is not None
|
||||
assert database_engine.optimizer.slow_range_in_select is False
|
||||
assert database_engine.optimizer.slow_dependent_subquery is False
|
||||
|
||||
|
||||
@pytest.mark.skip_on_db_engine(["mysql", "postgresql"])
|
||||
|
Loading…
x
Reference in New Issue
Block a user