Optimize fetching weekly/monthly/yearly statistics (#87747)

* Optimize fetching statistics

* speed up

* avoid double groupby

* avoid another loop

* tweak flow

* fixes

* tweak

* avoid a new dt object in the cache for week/month

* avoid a new dt object in the cache for week/month
This commit is contained in:
J. Nick Koston 2023-02-09 20:22:32 -06:00 committed by GitHub
parent 9dd806278b
commit ea356ad260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 119 deletions

View File

@ -356,17 +356,19 @@ async def ws_get_fossil_energy_consumption(
]
elif msg["period"] == "day":
_same_day, _day_start_end = recorder.statistics.reduce_day_factory()
reduced_fossil_energy = _reduce_deltas(
fossil_energy,
recorder.statistics.same_day,
recorder.statistics.day_start_end,
_same_day,
_day_start_end,
timedelta(days=1),
)
else:
_same_month, _month_start_end = recorder.statistics.reduce_month_factory()
reduced_fossil_energy = _reduce_deltas(
fossil_energy,
recorder.statistics.same_month,
recorder.statistics.month_start_end,
_same_month,
_month_start_end,
timedelta(days=1),
)

View File

@ -5,8 +5,8 @@ from collections import defaultdict
from collections.abc import Callable, Iterable, Mapping, Sequence
import contextlib
import dataclasses
from datetime import datetime, timedelta
from functools import partial
from datetime import date, datetime, timedelta
from functools import lru_cache, partial
from itertools import chain, groupby
import json
import logging
@ -197,15 +197,10 @@ def _get_statistic_to_display_unit_converter(
statistic_unit: str | None,
state_unit: str | None,
requested_units: dict[str, str] | None,
) -> Callable[[float | None], float | None]:
) -> Callable[[float | None], float | None] | None:
"""Prepare a converter from the statistics unit to display unit."""
def no_conversion(val: float | None) -> float | None:
"""Return val."""
return val
if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None:
return no_conversion
return None
display_unit: str | None
unit_class = converter.UNIT_CLASS
@ -216,22 +211,20 @@ def _get_statistic_to_display_unit_converter(
if display_unit not in converter.VALID_UNITS:
# Guard against invalid state unit in the DB
return no_conversion
return None
def from_normalized_unit(
val: float | None, conv: type[BaseUnitConverter], from_unit: str, to_unit: str
) -> float | None:
if display_unit == statistic_unit:
return None
convert = converter.convert
def _from_normalized_unit(val: float | None) -> float | None:
"""Return val."""
if val is None:
return val
return conv.convert(val, from_unit=from_unit, to_unit=to_unit)
return convert(val, statistic_unit, display_unit)
return partial(
from_normalized_unit,
conv=converter,
from_unit=statistic_unit,
to_unit=display_unit,
)
return _from_normalized_unit
def _get_display_to_statistic_unit_converter(
@ -1087,20 +1080,35 @@ def _reduce_statistics(
return result
def same_day(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same date."""
date1 = dt_util.as_local(time1).date()
date2 = dt_util.as_local(time2).date()
return date1 == date2
def reduce_day_factory() -> (
tuple[
Callable[[datetime, datetime], bool],
Callable[[datetime], tuple[datetime, datetime]],
]
):
"""Return functions to match same day and day start end."""
# We create _as_local_cached in the closure in case the timezone changes
_as_local_cached = lru_cache(maxsize=6)(dt_util.as_local)
def _as_local_date(time: datetime) -> date:
"""Return the local date of a datetime."""
return dt_util.as_local(time).date()
def day_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (day) time is within."""
start = dt_util.as_utc(
dt_util.as_local(time).replace(hour=0, minute=0, second=0, microsecond=0)
)
end = start + timedelta(days=1)
return (start, end)
_as_local_date_cached = lru_cache(maxsize=6)(_as_local_date)
def _same_day(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same date."""
return _as_local_date_cached(time1) == _as_local_date_cached(time2)
def _day_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (day) time is within."""
start = dt_util.as_utc(
_as_local_cached(time).replace(hour=0, minute=0, second=0, microsecond=0)
)
end = start + timedelta(days=1)
return (start, end)
return _same_day, _day_start_end
def _reduce_statistics_per_day(
@ -1108,29 +1116,47 @@ def _reduce_statistics_per_day(
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
) -> dict[str, list[dict[str, Any]]]:
"""Reduce hourly statistics to daily statistics."""
return _reduce_statistics(stats, same_day, day_start_end, timedelta(days=1), types)
def same_week(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and week."""
date1 = dt_util.as_local(time1).date()
date2 = dt_util.as_local(time2).date()
return (date1.year, date1.isocalendar().week) == (
date2.year,
date2.isocalendar().week,
_same_day, _day_start_end = reduce_day_factory()
return _reduce_statistics(
stats, _same_day, _day_start_end, timedelta(days=1), types
)
def week_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (week) time is within."""
time_local = dt_util.as_local(time)
start_local = time_local.replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=time_local.weekday())
start = dt_util.as_utc(start_local)
end = dt_util.as_utc(start_local + timedelta(days=7))
return (start, end)
def reduce_week_factory() -> (
tuple[
Callable[[datetime, datetime], bool],
Callable[[datetime], tuple[datetime, datetime]],
]
):
"""Return functions to match same week and week start end."""
# We create _as_local_cached in the closure in case the timezone changes
_as_local_cached = lru_cache(maxsize=6)(dt_util.as_local)
def _as_local_isocalendar(
time: datetime,
) -> tuple: # Need python3.11 for isocalendar typing
"""Return the local isocalendar of a datetime."""
return dt_util.as_local(time).isocalendar()
_as_local_isocalendar_cached = lru_cache(maxsize=6)(_as_local_isocalendar)
def _same_week(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and week."""
date1 = _as_local_isocalendar_cached(time1)
date2 = _as_local_isocalendar_cached(time2)
return (date1.year, date1.week) == (date2.year, date2.week) # type: ignore[attr-defined]
def _week_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (week) time is within."""
time_local = _as_local_cached(time)
start_local = time_local.replace(
hour=0, minute=0, second=0, microsecond=0
) - timedelta(days=time_local.weekday())
start = dt_util.as_utc(start_local)
end = dt_util.as_utc(start_local + timedelta(days=7))
return (start, end)
return _same_week, _week_start_end
def _reduce_statistics_per_week(
@ -1138,28 +1164,44 @@ def _reduce_statistics_per_week(
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
) -> dict[str, list[dict[str, Any]]]:
"""Reduce hourly statistics to weekly statistics."""
_same_week, _week_start_end = reduce_week_factory()
return _reduce_statistics(
stats, same_week, week_start_end, timedelta(days=7), types
stats, _same_week, _week_start_end, timedelta(days=7), types
)
def same_month(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and month."""
date1 = dt_util.as_local(time1).date()
date2 = dt_util.as_local(time2).date()
return (date1.year, date1.month) == (date2.year, date2.month)
def reduce_month_factory() -> (
tuple[
Callable[[datetime, datetime], bool],
Callable[[datetime], tuple[datetime, datetime]],
]
):
"""Return functions to match same month and month start end."""
# We create _as_local_cached in the closure in case the timezone changes
_as_local_cached = lru_cache(maxsize=6)(dt_util.as_local)
def _same_month(time1: datetime, time2: datetime) -> bool:
"""Return True if time1 and time2 are in the same year and month."""
if 2 < time1.day < 26 and 2 < time2.day < 26:
# No need to convert to local time if both dates are far
# enough from possible start or end of the month as time zones
# can't change more than 24 hours in a month.
return (time1.year, time1.month) == (time1.year, time1.month)
date1 = _as_local_cached(time1)
date2 = _as_local_cached(time2)
return (date1.year, date1.month) == (date2.year, date2.month)
def month_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (month) time is within."""
start_local = dt_util.as_local(time).replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
start = dt_util.as_utc(start_local)
end_local = (start_local + timedelta(days=31)).replace(day=1)
end = dt_util.as_utc(end_local)
return (start, end)
def _month_start_end(time: datetime) -> tuple[datetime, datetime]:
"""Return the start and end of the period (month) time is within."""
start_local = _as_local_cached(time).replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
start = dt_util.as_utc(start_local)
end_local = (start_local + timedelta(days=31)).replace(day=1)
end = dt_util.as_utc(end_local)
return (start, end)
return _same_month, _month_start_end
def _reduce_statistics_per_month(
@ -1167,9 +1209,9 @@ def _reduce_statistics_per_month(
types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]],
) -> dict[str, list[dict[str, Any]]]:
"""Reduce hourly statistics to monthly statistics."""
_same_month, _month_start_end = reduce_month_factory()
return _reduce_statistics(
stats, same_month, month_start_end, timedelta(days=31), types
stats, _same_month, _month_start_end, timedelta(days=31), types
)
@ -1628,7 +1670,7 @@ def statistic_during_period(
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
convert = _get_statistic_to_display_unit_converter(unit, state_unit, units)
return {key: convert(value) for key, value in result.items()}
return {key: convert(value) if convert else value for key, value in result.items()}
def _statistics_during_period_with_session(
@ -1952,77 +1994,76 @@ def _sorted_statistics_to_dict(
result: dict = defaultdict(list)
metadata = dict(_metadata.values())
need_stat_at_start_time: set[int] = set()
stats_at_start_time = {}
def no_conversion(val: float | None) -> float | None:
"""Return val."""
return val
start_time_ts = start_time.timestamp() if start_time else None
# Identify metadata IDs for which no data was available at the requested start time
stats_by_meta_id: dict[int, list[Row]] = {}
seen_statistic_ids: set[str] = set()
for meta_id, group in groupby(
stats,
lambda stat: stat.metadata_id, # type: ignore[no-any-return]
):
stats_list = stats_by_meta_id[meta_id] = list(group)
seen_statistic_ids.add(metadata[meta_id]["statistic_id"])
first_start_time_ts = stats_list[0].start_ts
if start_time_ts and first_start_time_ts > start_time_ts:
need_stat_at_start_time.add(meta_id)
# Set all statistic IDs to empty lists in result set to maintain the order
if statistic_ids is not None:
for stat_id in statistic_ids:
result[stat_id] = []
# Identify metadata IDs for which no data was available at the requested start time
for meta_id, group in groupby(
stats,
lambda stat: stat.metadata_id, # type: ignore[no-any-return]
):
first_start_time = dt_util.utc_from_timestamp(next(group).start_ts)
if start_time and first_start_time > start_time:
need_stat_at_start_time.add(meta_id)
# Only set the statistic ID if it is in the data to
# avoid having to do a second loop to remove the
# statistic IDs that are not in the data at the end
if stat_id in seen_statistic_ids:
result[stat_id] = []
# Fetch last known statistics for the needed metadata IDs
if need_stat_at_start_time:
assert start_time # Can not be None if need_stat_at_start_time is not empty
tmp = _statistics_at_time(
if tmp := _statistics_at_time(
session, need_stat_at_start_time, table, start_time, types
)
if tmp:
):
for stat in tmp:
stats_at_start_time[stat.metadata_id] = (stat,)
stats_by_meta_id[stat.metadata_id].insert(0, stat)
# Append all statistic entries, and optionally do unit conversion
table_duration = table.duration
table_duration_seconds = table.duration.total_seconds()
timestamp_to_datetime = dt_util.utc_from_timestamp
for meta_id, group in groupby(
stats,
lambda stat: stat.metadata_id, # type: ignore[no-any-return]
):
state_unit = unit = metadata[meta_id]["unit_of_measurement"]
statistic_id = metadata[meta_id]["statistic_id"]
if state := hass.states.get(statistic_id):
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
for meta_id, stats_list in stats_by_meta_id.items():
metadata_by_id = metadata[meta_id]
statistic_id = metadata_by_id["statistic_id"]
if convert_units:
state_unit = unit = metadata_by_id["unit_of_measurement"]
if state := hass.states.get(statistic_id):
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
convert = _get_statistic_to_display_unit_converter(unit, state_unit, units)
else:
convert = no_conversion
ent_results = result[meta_id]
for db_state in chain(stats_at_start_time.get(meta_id, ()), group):
start = timestamp_to_datetime(db_state.start_ts)
convert = None
ent_results = result[statistic_id]
for db_state in stats_list:
start_ts = db_state.start_ts
row: dict[str, Any] = {
"start": start,
"end": start + table_duration,
"start": timestamp_to_datetime(start_ts),
"end": timestamp_to_datetime(start_ts + table_duration_seconds),
}
if "mean" in types:
row["mean"] = convert(db_state.mean)
row["mean"] = convert(db_state.mean) if convert else db_state.mean
if "min" in types:
row["min"] = convert(db_state.min)
row["min"] = convert(db_state.min) if convert else db_state.min
if "max" in types:
row["max"] = convert(db_state.max)
row["max"] = convert(db_state.max) if convert else db_state.max
if "last_reset" in types:
row["last_reset"] = timestamp_to_datetime_or_none(
db_state.last_reset_ts
)
if "state" in types:
row["state"] = convert(db_state.state)
row["state"] = convert(db_state.state) if convert else db_state.state
if "sum" in types:
row["sum"] = convert(db_state.sum)
row["sum"] = convert(db_state.sum) if convert else db_state.sum
ent_results.append(row)
# Filter out the empty lists if some states had 0 results.
return {metadata[key]["statistic_id"]: val for key, val in result.items() if val}
return result
def validate_statistics(hass: HomeAssistant) -> dict[str, list[ValidationIssue]]:

View File

@ -145,9 +145,10 @@ def as_local(dattim: dt.datetime) -> dt.datetime:
return dattim.astimezone(DEFAULT_TIME_ZONE)
def utc_from_timestamp(timestamp: float) -> dt.datetime:
"""Return a UTC time from a timestamp."""
return dt.datetime.utcfromtimestamp(timestamp).replace(tzinfo=UTC)
# We use a partial here to improve performance by avoiding the global lookup
# of UTC and the function call overhead.
utc_from_timestamp = partial(dt.datetime.fromtimestamp, tz=UTC)
"""Return a UTC time from a timestamp."""
def utc_to_timestamp(utc_dt: dt.datetime) -> float: