diff --git a/homeassistant/components/energy/websocket_api.py b/homeassistant/components/energy/websocket_api.py index 293d34ae8d8..d2b4e3b33d9 100644 --- a/homeassistant/components/energy/websocket_api.py +++ b/homeassistant/components/energy/websocket_api.py @@ -356,17 +356,19 @@ async def ws_get_fossil_energy_consumption( ] elif msg["period"] == "day": + _same_day, _day_start_end = recorder.statistics.reduce_day_factory() reduced_fossil_energy = _reduce_deltas( fossil_energy, - recorder.statistics.same_day, - recorder.statistics.day_start_end, + _same_day, + _day_start_end, timedelta(days=1), ) else: + _same_month, _month_start_end = recorder.statistics.reduce_month_factory() reduced_fossil_energy = _reduce_deltas( fossil_energy, - recorder.statistics.same_month, - recorder.statistics.month_start_end, + _same_month, + _month_start_end, timedelta(days=1), ) diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index f0214b4143f..a6371f2e9a0 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -5,8 +5,8 @@ from collections import defaultdict from collections.abc import Callable, Iterable, Mapping, Sequence import contextlib import dataclasses -from datetime import datetime, timedelta -from functools import partial +from datetime import date, datetime, timedelta +from functools import lru_cache, partial from itertools import chain, groupby import json import logging @@ -197,15 +197,10 @@ def _get_statistic_to_display_unit_converter( statistic_unit: str | None, state_unit: str | None, requested_units: dict[str, str] | None, -) -> Callable[[float | None], float | None]: +) -> Callable[[float | None], float | None] | None: """Prepare a converter from the statistics unit to display unit.""" - - def no_conversion(val: float | None) -> float | None: - """Return val.""" - return val - if (converter := STATISTIC_UNIT_TO_UNIT_CONVERTER.get(statistic_unit)) is None: - return no_conversion + return None display_unit: str | None unit_class = converter.UNIT_CLASS @@ -216,22 +211,20 @@ def _get_statistic_to_display_unit_converter( if display_unit not in converter.VALID_UNITS: # Guard against invalid state unit in the DB - return no_conversion + return None - def from_normalized_unit( - val: float | None, conv: type[BaseUnitConverter], from_unit: str, to_unit: str - ) -> float | None: + if display_unit == statistic_unit: + return None + + convert = converter.convert + + def _from_normalized_unit(val: float | None) -> float | None: """Return val.""" if val is None: return val - return conv.convert(val, from_unit=from_unit, to_unit=to_unit) + return convert(val, statistic_unit, display_unit) - return partial( - from_normalized_unit, - conv=converter, - from_unit=statistic_unit, - to_unit=display_unit, - ) + return _from_normalized_unit def _get_display_to_statistic_unit_converter( @@ -1087,20 +1080,35 @@ def _reduce_statistics( return result -def same_day(time1: datetime, time2: datetime) -> bool: - """Return True if time1 and time2 are in the same date.""" - date1 = dt_util.as_local(time1).date() - date2 = dt_util.as_local(time2).date() - return date1 == date2 +def reduce_day_factory() -> ( + tuple[ + Callable[[datetime, datetime], bool], + Callable[[datetime], tuple[datetime, datetime]], + ] +): + """Return functions to match same day and day start end.""" + # We create _as_local_cached in the closure in case the timezone changes + _as_local_cached = lru_cache(maxsize=6)(dt_util.as_local) + def _as_local_date(time: datetime) -> date: + """Return the local date of a datetime.""" + return dt_util.as_local(time).date() -def day_start_end(time: datetime) -> tuple[datetime, datetime]: - """Return the start and end of the period (day) time is within.""" - start = dt_util.as_utc( - dt_util.as_local(time).replace(hour=0, minute=0, second=0, microsecond=0) - ) - end = start + timedelta(days=1) - return (start, end) + _as_local_date_cached = lru_cache(maxsize=6)(_as_local_date) + + def _same_day(time1: datetime, time2: datetime) -> bool: + """Return True if time1 and time2 are in the same date.""" + return _as_local_date_cached(time1) == _as_local_date_cached(time2) + + def _day_start_end(time: datetime) -> tuple[datetime, datetime]: + """Return the start and end of the period (day) time is within.""" + start = dt_util.as_utc( + _as_local_cached(time).replace(hour=0, minute=0, second=0, microsecond=0) + ) + end = start + timedelta(days=1) + return (start, end) + + return _same_day, _day_start_end def _reduce_statistics_per_day( @@ -1108,29 +1116,47 @@ def _reduce_statistics_per_day( types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]], ) -> dict[str, list[dict[str, Any]]]: """Reduce hourly statistics to daily statistics.""" - - return _reduce_statistics(stats, same_day, day_start_end, timedelta(days=1), types) - - -def same_week(time1: datetime, time2: datetime) -> bool: - """Return True if time1 and time2 are in the same year and week.""" - date1 = dt_util.as_local(time1).date() - date2 = dt_util.as_local(time2).date() - return (date1.year, date1.isocalendar().week) == ( - date2.year, - date2.isocalendar().week, + _same_day, _day_start_end = reduce_day_factory() + return _reduce_statistics( + stats, _same_day, _day_start_end, timedelta(days=1), types ) -def week_start_end(time: datetime) -> tuple[datetime, datetime]: - """Return the start and end of the period (week) time is within.""" - time_local = dt_util.as_local(time) - start_local = time_local.replace( - hour=0, minute=0, second=0, microsecond=0 - ) - timedelta(days=time_local.weekday()) - start = dt_util.as_utc(start_local) - end = dt_util.as_utc(start_local + timedelta(days=7)) - return (start, end) +def reduce_week_factory() -> ( + tuple[ + Callable[[datetime, datetime], bool], + Callable[[datetime], tuple[datetime, datetime]], + ] +): + """Return functions to match same week and week start end.""" + # We create _as_local_cached in the closure in case the timezone changes + _as_local_cached = lru_cache(maxsize=6)(dt_util.as_local) + + def _as_local_isocalendar( + time: datetime, + ) -> tuple: # Need python3.11 for isocalendar typing + """Return the local isocalendar of a datetime.""" + return dt_util.as_local(time).isocalendar() + + _as_local_isocalendar_cached = lru_cache(maxsize=6)(_as_local_isocalendar) + + def _same_week(time1: datetime, time2: datetime) -> bool: + """Return True if time1 and time2 are in the same year and week.""" + date1 = _as_local_isocalendar_cached(time1) + date2 = _as_local_isocalendar_cached(time2) + return (date1.year, date1.week) == (date2.year, date2.week) # type: ignore[attr-defined] + + def _week_start_end(time: datetime) -> tuple[datetime, datetime]: + """Return the start and end of the period (week) time is within.""" + time_local = _as_local_cached(time) + start_local = time_local.replace( + hour=0, minute=0, second=0, microsecond=0 + ) - timedelta(days=time_local.weekday()) + start = dt_util.as_utc(start_local) + end = dt_util.as_utc(start_local + timedelta(days=7)) + return (start, end) + + return _same_week, _week_start_end def _reduce_statistics_per_week( @@ -1138,28 +1164,44 @@ def _reduce_statistics_per_week( types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]], ) -> dict[str, list[dict[str, Any]]]: """Reduce hourly statistics to weekly statistics.""" - + _same_week, _week_start_end = reduce_week_factory() return _reduce_statistics( - stats, same_week, week_start_end, timedelta(days=7), types + stats, _same_week, _week_start_end, timedelta(days=7), types ) -def same_month(time1: datetime, time2: datetime) -> bool: - """Return True if time1 and time2 are in the same year and month.""" - date1 = dt_util.as_local(time1).date() - date2 = dt_util.as_local(time2).date() - return (date1.year, date1.month) == (date2.year, date2.month) +def reduce_month_factory() -> ( + tuple[ + Callable[[datetime, datetime], bool], + Callable[[datetime], tuple[datetime, datetime]], + ] +): + """Return functions to match same month and month start end.""" + # We create _as_local_cached in the closure in case the timezone changes + _as_local_cached = lru_cache(maxsize=6)(dt_util.as_local) + def _same_month(time1: datetime, time2: datetime) -> bool: + """Return True if time1 and time2 are in the same year and month.""" + if 2 < time1.day < 26 and 2 < time2.day < 26: + # No need to convert to local time if both dates are far + # enough from possible start or end of the month as time zones + # can't change more than 24 hours in a month. + return (time1.year, time1.month) == (time1.year, time1.month) + date1 = _as_local_cached(time1) + date2 = _as_local_cached(time2) + return (date1.year, date1.month) == (date2.year, date2.month) -def month_start_end(time: datetime) -> tuple[datetime, datetime]: - """Return the start and end of the period (month) time is within.""" - start_local = dt_util.as_local(time).replace( - day=1, hour=0, minute=0, second=0, microsecond=0 - ) - start = dt_util.as_utc(start_local) - end_local = (start_local + timedelta(days=31)).replace(day=1) - end = dt_util.as_utc(end_local) - return (start, end) + def _month_start_end(time: datetime) -> tuple[datetime, datetime]: + """Return the start and end of the period (month) time is within.""" + start_local = _as_local_cached(time).replace( + day=1, hour=0, minute=0, second=0, microsecond=0 + ) + start = dt_util.as_utc(start_local) + end_local = (start_local + timedelta(days=31)).replace(day=1) + end = dt_util.as_utc(end_local) + return (start, end) + + return _same_month, _month_start_end def _reduce_statistics_per_month( @@ -1167,9 +1209,9 @@ def _reduce_statistics_per_month( types: set[Literal["last_reset", "max", "mean", "min", "state", "sum"]], ) -> dict[str, list[dict[str, Any]]]: """Reduce hourly statistics to monthly statistics.""" - + _same_month, _month_start_end = reduce_month_factory() return _reduce_statistics( - stats, same_month, month_start_end, timedelta(days=31), types + stats, _same_month, _month_start_end, timedelta(days=31), types ) @@ -1628,7 +1670,7 @@ def statistic_during_period( state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) convert = _get_statistic_to_display_unit_converter(unit, state_unit, units) - return {key: convert(value) for key, value in result.items()} + return {key: convert(value) if convert else value for key, value in result.items()} def _statistics_during_period_with_session( @@ -1952,77 +1994,76 @@ def _sorted_statistics_to_dict( result: dict = defaultdict(list) metadata = dict(_metadata.values()) need_stat_at_start_time: set[int] = set() - stats_at_start_time = {} - - def no_conversion(val: float | None) -> float | None: - """Return val.""" - return val + start_time_ts = start_time.timestamp() if start_time else None + # Identify metadata IDs for which no data was available at the requested start time + stats_by_meta_id: dict[int, list[Row]] = {} + seen_statistic_ids: set[str] = set() + for meta_id, group in groupby( + stats, + lambda stat: stat.metadata_id, # type: ignore[no-any-return] + ): + stats_list = stats_by_meta_id[meta_id] = list(group) + seen_statistic_ids.add(metadata[meta_id]["statistic_id"]) + first_start_time_ts = stats_list[0].start_ts + if start_time_ts and first_start_time_ts > start_time_ts: + need_stat_at_start_time.add(meta_id) # Set all statistic IDs to empty lists in result set to maintain the order if statistic_ids is not None: for stat_id in statistic_ids: - result[stat_id] = [] - - # Identify metadata IDs for which no data was available at the requested start time - for meta_id, group in groupby( - stats, - lambda stat: stat.metadata_id, # type: ignore[no-any-return] - ): - first_start_time = dt_util.utc_from_timestamp(next(group).start_ts) - if start_time and first_start_time > start_time: - need_stat_at_start_time.add(meta_id) + # Only set the statistic ID if it is in the data to + # avoid having to do a second loop to remove the + # statistic IDs that are not in the data at the end + if stat_id in seen_statistic_ids: + result[stat_id] = [] # Fetch last known statistics for the needed metadata IDs if need_stat_at_start_time: assert start_time # Can not be None if need_stat_at_start_time is not empty - tmp = _statistics_at_time( + if tmp := _statistics_at_time( session, need_stat_at_start_time, table, start_time, types - ) - if tmp: + ): for stat in tmp: - stats_at_start_time[stat.metadata_id] = (stat,) + stats_by_meta_id[stat.metadata_id].insert(0, stat) # Append all statistic entries, and optionally do unit conversion - table_duration = table.duration + table_duration_seconds = table.duration.total_seconds() timestamp_to_datetime = dt_util.utc_from_timestamp - for meta_id, group in groupby( - stats, - lambda stat: stat.metadata_id, # type: ignore[no-any-return] - ): - state_unit = unit = metadata[meta_id]["unit_of_measurement"] - statistic_id = metadata[meta_id]["statistic_id"] - if state := hass.states.get(statistic_id): - state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) + for meta_id, stats_list in stats_by_meta_id.items(): + metadata_by_id = metadata[meta_id] + statistic_id = metadata_by_id["statistic_id"] if convert_units: + state_unit = unit = metadata_by_id["unit_of_measurement"] + if state := hass.states.get(statistic_id): + state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT) convert = _get_statistic_to_display_unit_converter(unit, state_unit, units) else: - convert = no_conversion - ent_results = result[meta_id] - for db_state in chain(stats_at_start_time.get(meta_id, ()), group): - start = timestamp_to_datetime(db_state.start_ts) + convert = None + ent_results = result[statistic_id] + for db_state in stats_list: + start_ts = db_state.start_ts row: dict[str, Any] = { - "start": start, - "end": start + table_duration, + "start": timestamp_to_datetime(start_ts), + "end": timestamp_to_datetime(start_ts + table_duration_seconds), } if "mean" in types: - row["mean"] = convert(db_state.mean) + row["mean"] = convert(db_state.mean) if convert else db_state.mean if "min" in types: - row["min"] = convert(db_state.min) + row["min"] = convert(db_state.min) if convert else db_state.min if "max" in types: - row["max"] = convert(db_state.max) + row["max"] = convert(db_state.max) if convert else db_state.max if "last_reset" in types: row["last_reset"] = timestamp_to_datetime_or_none( db_state.last_reset_ts ) if "state" in types: - row["state"] = convert(db_state.state) + row["state"] = convert(db_state.state) if convert else db_state.state if "sum" in types: - row["sum"] = convert(db_state.sum) + row["sum"] = convert(db_state.sum) if convert else db_state.sum ent_results.append(row) - # Filter out the empty lists if some states had 0 results. - return {metadata[key]["statistic_id"]: val for key, val in result.items() if val} + return result def validate_statistics(hass: HomeAssistant) -> dict[str, list[ValidationIssue]]: diff --git a/homeassistant/util/dt.py b/homeassistant/util/dt.py index 26f001236ec..09a7923aaaf 100644 --- a/homeassistant/util/dt.py +++ b/homeassistant/util/dt.py @@ -145,9 +145,10 @@ def as_local(dattim: dt.datetime) -> dt.datetime: return dattim.astimezone(DEFAULT_TIME_ZONE) -def utc_from_timestamp(timestamp: float) -> dt.datetime: - """Return a UTC time from a timestamp.""" - return dt.datetime.utcfromtimestamp(timestamp).replace(tzinfo=UTC) +# We use a partial here to improve performance by avoiding the global lookup +# of UTC and the function call overhead. +utc_from_timestamp = partial(dt.datetime.fromtimestamp, tz=UTC) +"""Return a UTC time from a timestamp.""" def utc_to_timestamp(utc_dt: dt.datetime) -> float: