Speed up generating large stats results (#119210)

* Speed up generating large stats results

* naming

* fix type

* fix type

* tweak

* tweak

* delete unused code
This commit is contained in:
J. Nick Koston 2024-06-10 20:22:59 -05:00 committed by GitHub
parent 0149698002
commit 3308f07d4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -241,7 +241,8 @@ def _get_statistic_to_display_unit_converter(
statistic_unit: str | None, statistic_unit: str | None,
state_unit: str | None, state_unit: str | None,
requested_units: dict[str, str] | None, requested_units: dict[str, str] | None,
) -> Callable[[float | None], float | None] | None: allow_none: bool = True,
) -> Callable[[float | None], float | None] | Callable[[float], float] | None:
"""Prepare a converter from the statistics unit to display unit.""" """Prepare a converter from the statistics unit to display unit."""
if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None: if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None:
return None return None
@ -260,9 +261,11 @@ def _get_statistic_to_display_unit_converter(
if display_unit == statistic_unit: if display_unit == statistic_unit:
return None return None
return converter.converter_factory_allow_none( if allow_none:
from_unit=statistic_unit, to_unit=display_unit return converter.converter_factory_allow_none(
) from_unit=statistic_unit, to_unit=display_unit
)
return converter.converter_factory(from_unit=statistic_unit, to_unit=display_unit)
def _get_display_to_statistic_unit_converter( def _get_display_to_statistic_unit_converter(
@ -1760,13 +1763,11 @@ def _statistics_during_period_with_session(
result = _sorted_statistics_to_dict( result = _sorted_statistics_to_dict(
hass, hass,
session,
stats, stats,
statistic_ids, statistic_ids,
metadata, metadata,
True, True,
table, table,
start_time,
units, units,
types, types,
) )
@ -1878,14 +1879,12 @@ def _get_last_statistics(
# Return statistics combined with metadata # Return statistics combined with metadata
return _sorted_statistics_to_dict( return _sorted_statistics_to_dict(
hass, hass,
session,
stats, stats,
statistic_ids, statistic_ids,
metadata, metadata,
convert_units, convert_units,
table, table,
None, None,
None,
types, types,
) )
@ -1993,14 +1992,12 @@ def get_latest_short_term_statistics_with_session(
# Return statistics combined with metadata # Return statistics combined with metadata
return _sorted_statistics_to_dict( return _sorted_statistics_to_dict(
hass, hass,
session,
stats, stats,
statistic_ids, statistic_ids,
metadata, metadata,
False, False,
StatisticsShortTerm, StatisticsShortTerm,
None, None,
None,
types, types,
) )
@ -2047,42 +2044,119 @@ def _statistics_at_time(
return cast(Sequence[Row], execute_stmt_lambda_element(session, stmt)) return cast(Sequence[Row], execute_stmt_lambda_element(session, stmt))
def _fast_build_sum_list( def _build_sum_converted_stats(
stats_list: list[Row], db_rows: list[Row],
table_duration_seconds: float,
start_ts_idx: int,
sum_idx: int,
convert: Callable[[float | None], float | None] | Callable[[float], float],
) -> list[StatisticsRow]:
"""Build a list of sum statistics."""
return [
{
"start": (start_ts := db_row[start_ts_idx]),
"end": start_ts + table_duration_seconds,
"sum": None if (v := db_row[sum_idx]) is None else convert(v),
}
for db_row in db_rows
]
def _build_sum_stats(
db_rows: list[Row],
table_duration_seconds: float, table_duration_seconds: float,
convert: Callable | None,
start_ts_idx: int, start_ts_idx: int,
sum_idx: int, sum_idx: int,
) -> list[StatisticsRow]: ) -> list[StatisticsRow]:
"""Build a list of sum statistics.""" """Build a list of sum statistics."""
if convert:
return [
{
"start": (start_ts := db_state[start_ts_idx]),
"end": start_ts + table_duration_seconds,
"sum": convert(db_state[sum_idx]),
}
for db_state in stats_list
]
return [ return [
{ {
"start": (start_ts := db_state[start_ts_idx]), "start": (start_ts := db_row[start_ts_idx]),
"end": start_ts + table_duration_seconds, "end": start_ts + table_duration_seconds,
"sum": db_state[sum_idx], "sum": db_row[sum_idx],
} }
for db_state in stats_list for db_row in db_rows
] ]
def _sorted_statistics_to_dict( # noqa: C901 def _build_stats(
db_rows: list[Row],
table_duration_seconds: float,
start_ts_idx: int,
mean_idx: int | None,
min_idx: int | None,
max_idx: int | None,
last_reset_ts_idx: int | None,
state_idx: int | None,
sum_idx: int | None,
) -> list[StatisticsRow]:
"""Build a list of statistics without unit conversion."""
result: list[StatisticsRow] = []
ent_results_append = result.append
for db_row in db_rows:
row: StatisticsRow = {
"start": (start_ts := db_row[start_ts_idx]),
"end": start_ts + table_duration_seconds,
}
if last_reset_ts_idx is not None:
row["last_reset"] = db_row[last_reset_ts_idx]
if mean_idx is not None:
row["mean"] = db_row[mean_idx]
if min_idx is not None:
row["min"] = db_row[min_idx]
if max_idx is not None:
row["max"] = db_row[max_idx]
if state_idx is not None:
row["state"] = db_row[state_idx]
if sum_idx is not None:
row["sum"] = db_row[sum_idx]
ent_results_append(row)
return result
def _build_converted_stats(
db_rows: list[Row],
table_duration_seconds: float,
start_ts_idx: int,
mean_idx: int | None,
min_idx: int | None,
max_idx: int | None,
last_reset_ts_idx: int | None,
state_idx: int | None,
sum_idx: int | None,
convert: Callable[[float | None], float | None] | Callable[[float], float],
) -> list[StatisticsRow]:
"""Build a list of statistics with unit conversion."""
result: list[StatisticsRow] = []
ent_results_append = result.append
for db_row in db_rows:
row: StatisticsRow = {
"start": (start_ts := db_row[start_ts_idx]),
"end": start_ts + table_duration_seconds,
}
if last_reset_ts_idx is not None:
row["last_reset"] = db_row[last_reset_ts_idx]
if mean_idx is not None:
row["mean"] = None if (v := db_row[mean_idx]) is None else convert(v)
if min_idx is not None:
row["min"] = None if (v := db_row[min_idx]) is None else convert(v)
if max_idx is not None:
row["max"] = None if (v := db_row[max_idx]) is None else convert(v)
if state_idx is not None:
row["state"] = None if (v := db_row[state_idx]) is None else convert(v)
if sum_idx is not None:
row["sum"] = None if (v := db_row[sum_idx]) is None else convert(v)
ent_results_append(row)
return result
def _sorted_statistics_to_dict(
hass: HomeAssistant, hass: HomeAssistant,
session: Session,
stats: Sequence[Row[Any]], stats: Sequence[Row[Any]],
statistic_ids: set[str] | None, statistic_ids: set[str] | None,
_metadata: dict[str, tuple[int, StatisticMetaData]], _metadata: dict[str, tuple[int, StatisticMetaData]],
convert_units: bool, convert_units: bool,
table: type[StatisticsBase], table: type[StatisticsBase],
start_time: datetime | None,
units: dict[str, str] | None, units: dict[str, str] | None,
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]], types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
) -> dict[str, list[StatisticsRow]]: ) -> dict[str, list[StatisticsRow]]:
@ -2120,19 +2194,23 @@ def _sorted_statistics_to_dict( # noqa: C901
state_idx = field_map["state"] if "state" in types else None state_idx = field_map["state"] if "state" in types else None
sum_idx = field_map["sum"] if "sum" in types else None sum_idx = field_map["sum"] if "sum" in types else None
sum_only = len(types) == 1 and sum_idx is not None sum_only = len(types) == 1 and sum_idx is not None
row_idxes = (mean_idx, min_idx, max_idx, last_reset_ts_idx, state_idx, sum_idx)
# Append all statistic entries, and optionally do unit conversion # Append all statistic entries, and optionally do unit conversion
table_duration_seconds = table.duration.total_seconds() table_duration_seconds = table.duration.total_seconds()
for meta_id, stats_list in stats_by_meta_id.items(): for meta_id, db_rows in stats_by_meta_id.items():
metadata_by_id = metadata[meta_id] metadata_by_id = metadata[meta_id]
statistic_id = metadata_by_id["statistic_id"] statistic_id = metadata_by_id["statistic_id"]
if convert_units: if convert_units:
state_unit = unit = metadata_by_id["unit_of_measurement"] state_unit = unit = metadata_by_id["unit_of_measurement"]
if state := hass.states.get(statistic_id): if state := hass.states.get(statistic_id):
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
convert = _get_statistic_to_display_unit_converter(unit, state_unit, units) convert = _get_statistic_to_display_unit_converter(
unit, state_unit, units, allow_none=False
)
else: else:
convert = None convert = None
build_args = (db_rows, table_duration_seconds, start_ts_idx)
if sum_only: if sum_only:
# This function is extremely flexible and can handle all types of # This function is extremely flexible and can handle all types of
# statistics, but in practice we only ever use a few combinations. # statistics, but in practice we only ever use a few combinations.
@ -2140,53 +2218,16 @@ def _sorted_statistics_to_dict( # noqa: C901
# For energy, we only need sum statistics, so we can optimize # For energy, we only need sum statistics, so we can optimize
# this path to avoid the overhead of the more generic function. # this path to avoid the overhead of the more generic function.
assert sum_idx is not None assert sum_idx is not None
result[statistic_id] = _fast_build_sum_list(
stats_list,
table_duration_seconds,
convert,
start_ts_idx,
sum_idx,
)
continue
ent_results_append = result[statistic_id].append
#
# The below loop is a red hot path for energy, and every
# optimization counts in here.
#
# Specifically, we want to avoid function calls,
# attribute lookups, and dict lookups as much as possible.
#
for db_state in stats_list:
row: StatisticsRow = {
"start": (start_ts := db_state[start_ts_idx]),
"end": start_ts + table_duration_seconds,
}
if last_reset_ts_idx is not None:
row["last_reset"] = db_state[last_reset_ts_idx]
if convert: if convert:
if mean_idx is not None: _stats = _build_sum_converted_stats(*build_args, sum_idx, convert)
row["mean"] = convert(db_state[mean_idx])
if min_idx is not None:
row["min"] = convert(db_state[min_idx])
if max_idx is not None:
row["max"] = convert(db_state[max_idx])
if state_idx is not None:
row["state"] = convert(db_state[state_idx])
if sum_idx is not None:
row["sum"] = convert(db_state[sum_idx])
else: else:
if mean_idx is not None: _stats = _build_sum_stats(*build_args, sum_idx)
row["mean"] = db_state[mean_idx] elif convert:
if min_idx is not None: _stats = _build_converted_stats(*build_args, *row_idxes, convert)
row["min"] = db_state[min_idx] else:
if max_idx is not None: _stats = _build_stats(*build_args, *row_idxes)
row["max"] = db_state[max_idx]
if state_idx is not None: result[statistic_id] = _stats
row["state"] = db_state[state_idx]
if sum_idx is not None:
row["sum"] = db_state[sum_idx]
ent_results_append(row)
return result return result