From 9b8f94363c0b4ecd1434ac1ac3bb82febd3889d0 Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Wed, 16 Nov 2022 12:46:29 +0100 Subject: [PATCH] Fix statistic_during_period for data with holes (#81847) --- .../components/recorder/statistics.py | 201 +++++++----- .../components/recorder/test_websocket_api.py | 289 +++++++++++++++--- 2 files changed, 375 insertions(+), 115 deletions(-) diff --git a/homeassistant/components/recorder/statistics.py b/homeassistant/components/recorder/statistics.py index 8a744fd4daa..e361249580f 100644 --- a/homeassistant/components/recorder/statistics.py +++ b/homeassistant/components/recorder/statistics.py @@ -1216,11 +1216,29 @@ def _get_max_mean_min_statistic( return result +def _first_statistic( + session: Session, + table: type[Statistics | StatisticsShortTerm], + metadata_id: int, +) -> datetime | None: + """Return the data of the oldest statistic row for a given metadata id.""" + stmt = lambda_stmt( + lambda: select(table.start) + .filter(table.metadata_id == metadata_id) + .order_by(table.start.asc()) + .limit(1) + ) + if stats := execute_stmt_lambda_element(session, stmt): + return process_timestamp(stats[0].start) # type: ignore[no-any-return] + return None + + def _get_oldest_sum_statistic( session: Session, head_start_time: datetime | None, main_start_time: datetime | None, tail_start_time: datetime | None, + oldest_stat: datetime | None, tail_only: bool, metadata_id: int, ) -> float | None: @@ -1231,10 +1249,10 @@ def _get_oldest_sum_statistic( start_time: datetime | None, table: type[Statistics | StatisticsShortTerm], metadata_id: int, - ) -> tuple[float | None, datetime | None]: + ) -> float | None: """Return the oldest non-NULL sum during the period.""" stmt = lambda_stmt( - lambda: select(table.sum, table.start) + lambda: select(table.sum) .filter(table.metadata_id == metadata_id) .filter(table.sum.is_not(None)) .order_by(table.start.asc()) @@ -1248,49 +1266,49 @@ def _get_oldest_sum_statistic( else: period = start_time.replace(minute=0, second=0, microsecond=0) prev_period = period - table.duration - stmt += lambda q: q.filter(table.start == prev_period) + stmt += lambda q: q.filter(table.start >= prev_period) stats = execute_stmt_lambda_element(session, stmt) - return ( - (stats[0].sum, process_timestamp(stats[0].start)) if stats else (None, None) - ) + return stats[0].sum if stats else None - oldest_start: datetime | None oldest_sum: float | None = None - if head_start_time is not None: - oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period( - session, head_start_time, StatisticsShortTerm, metadata_id + # This function won't be called if tail_only is False and main_start_time is None + # the extra checks are added to satisfy MyPy + if not tail_only and main_start_time is not None and oldest_stat is not None: + period = main_start_time.replace(minute=0, second=0, microsecond=0) + prev_period = period - Statistics.duration + if prev_period < oldest_stat: + return 0 + + if ( + head_start_time is not None + and ( + oldest_sum := _get_oldest_sum_statistic_in_sub_period( + session, head_start_time, StatisticsShortTerm, metadata_id + ) ) - if ( - oldest_start is not None - and oldest_start < head_start_time - and oldest_sum is not None - ): - return oldest_sum + is not None + ): + return oldest_sum if not tail_only: - assert main_start_time is not None - oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period( - session, main_start_time, Statistics, metadata_id - ) if ( - oldest_start is not None - and oldest_start < main_start_time - and oldest_sum is not None - ): + oldest_sum := _get_oldest_sum_statistic_in_sub_period( + session, main_start_time, Statistics, metadata_id + ) + ) is not None: return oldest_sum return 0 - if tail_start_time is not None: - oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period( - session, tail_start_time, StatisticsShortTerm, metadata_id + if ( + tail_start_time is not None + and ( + oldest_sum := _get_oldest_sum_statistic_in_sub_period( + session, tail_start_time, StatisticsShortTerm, metadata_id + ) ) - if ( - oldest_start is not None - and oldest_start < tail_start_time - and oldest_sum is not None - ): - return oldest_sum + ) is not None: + return oldest_sum return 0 @@ -1373,51 +1391,79 @@ def statistic_during_period( result: dict[str, Any] = {} - # To calculate the summary, data from the statistics (hourly) and short_term_statistics - # (5 minute) tables is combined - # - The short term statistics table is used for the head and tail of the period, - # if the period it doesn't start or end on a full hour - # - The statistics table is used for the remainder of the time - now = dt_util.utcnow() - if end_time is not None and end_time > now: - end_time = now - - tail_only = ( - start_time is not None - and end_time is not None - and end_time - start_time < timedelta(hours=1) - ) - - # Calculate the head period - head_start_time: datetime | None = None - head_end_time: datetime | None = None - if not tail_only and start_time is not None and start_time.minute: - head_start_time = start_time - head_end_time = start_time.replace( - minute=0, second=0, microsecond=0 - ) + timedelta(hours=1) - - # Calculate the tail period - tail_start_time: datetime | None = None - tail_end_time: datetime | None = None - if end_time is None: - tail_start_time = now.replace(minute=0, second=0, microsecond=0) - elif end_time.minute: - tail_start_time = ( - start_time - if tail_only - else end_time.replace(minute=0, second=0, microsecond=0) - ) - tail_end_time = end_time - - # Calculate the main period - main_start_time: datetime | None = None - main_end_time: datetime | None = None - if not tail_only: - main_start_time = start_time if head_end_time is None else head_end_time - main_end_time = end_time if tail_start_time is None else tail_start_time - with session_scope(hass=hass) as session: + # Fetch metadata for the given statistic_id + if not ( + metadata := get_metadata_with_session(session, statistic_ids=[statistic_id]) + ): + return result + + metadata_id = metadata[statistic_id][0] + + oldest_stat = _first_statistic(session, Statistics, metadata_id) + oldest_5_min_stat = None + if not valid_statistic_id(statistic_id): + oldest_5_min_stat = _first_statistic( + session, StatisticsShortTerm, metadata_id + ) + + # To calculate the summary, data from the statistics (hourly) and + # short_term_statistics (5 minute) tables is combined + # - The short term statistics table is used for the head and tail of the period, + # if the period it doesn't start or end on a full hour + # - The statistics table is used for the remainder of the time + now = dt_util.utcnow() + if end_time is not None and end_time > now: + end_time = now + + tail_only = ( + start_time is not None + and end_time is not None + and end_time - start_time < timedelta(hours=1) + ) + + # Calculate the head period + head_start_time: datetime | None = None + head_end_time: datetime | None = None + if ( + not tail_only + and oldest_stat is not None + and oldest_5_min_stat is not None + and oldest_5_min_stat - oldest_stat < timedelta(hours=1) + and (start_time is None or start_time < oldest_5_min_stat) + ): + # To improve accuracy of averaged for statistics which were added within + # recorder's retention period. + head_start_time = oldest_5_min_stat + head_end_time = oldest_5_min_stat.replace( + minute=0, second=0, microsecond=0 + ) + timedelta(hours=1) + elif not tail_only and start_time is not None and start_time.minute: + head_start_time = start_time + head_end_time = start_time.replace( + minute=0, second=0, microsecond=0 + ) + timedelta(hours=1) + + # Calculate the tail period + tail_start_time: datetime | None = None + tail_end_time: datetime | None = None + if end_time is None: + tail_start_time = now.replace(minute=0, second=0, microsecond=0) + elif end_time.minute: + tail_start_time = ( + start_time + if tail_only + else end_time.replace(minute=0, second=0, microsecond=0) + ) + tail_end_time = end_time + + # Calculate the main period + main_start_time: datetime | None = None + main_end_time: datetime | None = None + if not tail_only: + main_start_time = start_time if head_end_time is None else head_end_time + main_end_time = end_time if tail_start_time is None else tail_start_time + # Fetch metadata for the given statistic_id metadata = get_metadata_with_session(session, statistic_ids=[statistic_id]) if not metadata: @@ -1449,6 +1495,7 @@ def statistic_during_period( head_start_time, main_start_time, tail_start_time, + oldest_stat, tail_only, metadata_id, ) diff --git a/tests/components/recorder/test_websocket_api.py b/tests/components/recorder/test_websocket_api.py index 00e9d0d35b4..423ab4bbc52 100644 --- a/tests/components/recorder/test_websocket_api.py +++ b/tests/components/recorder/test_websocket_api.py @@ -182,7 +182,8 @@ async def test_statistics_during_period(recorder_mock, hass, hass_ws_client): @freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc)) -async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): +@pytest.mark.parametrize("offset", (0, 1, 2)) +async def test_statistic_during_period(recorder_mock, hass, hass_ws_client, offset): """Test statistic_during_period.""" id = 1 @@ -197,7 +198,9 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): client = await hass_ws_client() zero = now - start = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=-3) + start = zero.replace(minute=offset * 5, second=0, microsecond=0) + timedelta( + hours=-3 + ) imported_stats_5min = [ { @@ -209,22 +212,37 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } for i in range(0, 39) ] - imported_stats = [ + imported_stats = [] + slice_end = 12 - offset + imported_stats.append( { - "start": imported_stats_5min[i * 12]["start"], - "max": max( - stat["max"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12] - ), - "mean": fmean( - stat["mean"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12] - ), - "min": min( - stat["min"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12] - ), - "sum": imported_stats_5min[i * 12 + 11]["sum"], + "start": imported_stats_5min[0]["start"].replace(minute=0), + "max": max(stat["max"] for stat in imported_stats_5min[0:slice_end]), + "mean": fmean(stat["mean"] for stat in imported_stats_5min[0:slice_end]), + "min": min(stat["min"] for stat in imported_stats_5min[0:slice_end]), + "sum": imported_stats_5min[slice_end - 1]["sum"], } - for i in range(0, 3) - ] + ) + for i in range(0, 2): + slice_start = i * 12 + (12 - offset) + slice_end = (i + 1) * 12 + (12 - offset) + assert imported_stats_5min[slice_start]["start"].minute == 0 + imported_stats.append( + { + "start": imported_stats_5min[slice_start]["start"], + "max": max( + stat["max"] for stat in imported_stats_5min[slice_start:slice_end] + ), + "mean": fmean( + stat["mean"] for stat in imported_stats_5min[slice_start:slice_end] + ), + "min": min( + stat["min"] for stat in imported_stats_5min[slice_start:slice_end] + ), + "sum": imported_stats_5min[slice_end - 1]["sum"], + } + ) + imported_metadata = { "has_mean": False, "has_sum": True, @@ -285,8 +303,14 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should also include imported_statistics_5min[:] - start_time = "2022-10-21T04:00:00+00:00" - end_time = "2022-10-21T07:15:00+00:00" + start_time = ( + dt_util.parse_datetime("2022-10-21T04:00:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() + end_time = ( + dt_util.parse_datetime("2022-10-21T07:15:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() await client.send_json( { "id": next_id(), @@ -308,8 +332,14 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should also include imported_statistics_5min[:] - start_time = "2022-10-20T04:00:00+00:00" - end_time = "2022-10-21T08:20:00+00:00" + start_time = ( + dt_util.parse_datetime("2022-10-21T04:00:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() + end_time = ( + dt_util.parse_datetime("2022-10-21T08:20:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() await client.send_json( { "id": next_id(), @@ -331,7 +361,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should include imported_statistics_5min[26:] - start_time = "2022-10-21T06:10:00+00:00" + start_time = ( + dt_util.parse_datetime("2022-10-21T06:10:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() assert imported_stats_5min[26]["start"].isoformat() == start_time await client.send_json( { @@ -353,7 +386,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should also include imported_statistics_5min[26:] - start_time = "2022-10-21T06:09:00+00:00" + start_time = ( + dt_util.parse_datetime("2022-10-21T06:09:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() await client.send_json( { "id": next_id(), @@ -374,7 +410,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should include imported_statistics_5min[:26] - end_time = "2022-10-21T06:10:00+00:00" + end_time = ( + dt_util.parse_datetime("2022-10-21T06:10:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() assert imported_stats_5min[26]["start"].isoformat() == end_time await client.send_json( { @@ -396,9 +435,15 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } # This should include imported_statistics_5min[26:32] (less than a full hour) - start_time = "2022-10-21T06:10:00+00:00" + start_time = ( + dt_util.parse_datetime("2022-10-21T06:10:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() assert imported_stats_5min[26]["start"].isoformat() == start_time - end_time = "2022-10-21T06:40:00+00:00" + end_time = ( + dt_util.parse_datetime("2022-10-21T06:40:00+00:00") + + timedelta(minutes=5 * offset) + ).isoformat() assert imported_stats_5min[32]["start"].isoformat() == end_time await client.send_json( { @@ -422,7 +467,7 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): # This should include imported_statistics[2:] + imported_statistics_5min[36:] start_time = "2022-10-21T06:00:00+00:00" - assert imported_stats_5min[24]["start"].isoformat() == start_time + assert imported_stats_5min[24 - offset]["start"].isoformat() == start_time assert imported_stats[2]["start"].isoformat() == start_time await client.send_json( { @@ -437,10 +482,11 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): response = await client.receive_json() assert response["success"] assert response["result"] == { - "max": max(stat["max"] for stat in imported_stats_5min[24:]), - "mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]), - "min": min(stat["min"] for stat in imported_stats_5min[24:]), - "change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"], + "max": max(stat["max"] for stat in imported_stats_5min[24 - offset :]), + "mean": fmean(stat["mean"] for stat in imported_stats_5min[24 - offset :]), + "min": min(stat["min"] for stat in imported_stats_5min[24 - offset :]), + "change": imported_stats_5min[-1]["sum"] + - imported_stats_5min[23 - offset]["sum"], } # This should also include imported_statistics[2:] + imported_statistics_5min[36:] @@ -457,10 +503,11 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): response = await client.receive_json() assert response["success"] assert response["result"] == { - "max": max(stat["max"] for stat in imported_stats_5min[24:]), - "mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]), - "min": min(stat["min"] for stat in imported_stats_5min[24:]), - "change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"], + "max": max(stat["max"] for stat in imported_stats_5min[24 - offset :]), + "mean": fmean(stat["mean"] for stat in imported_stats_5min[24 - offset :]), + "min": min(stat["min"] for stat in imported_stats_5min[24 - offset :]), + "change": imported_stats_5min[-1]["sum"] + - imported_stats_5min[23 - offset]["sum"], } # This should include imported_statistics[2:3] @@ -477,11 +524,16 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): ) response = await client.receive_json() assert response["success"] + slice_start = 24 - offset + slice_end = 36 - offset assert response["result"] == { - "max": max(stat["max"] for stat in imported_stats_5min[24:36]), - "mean": fmean(stat["mean"] for stat in imported_stats_5min[24:36]), - "min": min(stat["min"] for stat in imported_stats_5min[24:36]), - "change": imported_stats_5min[35]["sum"] - imported_stats_5min[23]["sum"], + "max": max(stat["max"] for stat in imported_stats_5min[slice_start:slice_end]), + "mean": fmean( + stat["mean"] for stat in imported_stats_5min[slice_start:slice_end] + ), + "min": min(stat["min"] for stat in imported_stats_5min[slice_start:slice_end]), + "change": imported_stats_5min[slice_end - 1]["sum"] + - imported_stats_5min[slice_start - 1]["sum"], } # Test we can get only selected types @@ -539,6 +591,167 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client): } +@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc)) +async def test_statistic_during_period_hole(recorder_mock, hass, hass_ws_client): + """Test statistic_during_period when there are holes in the data.""" + id = 1 + + def next_id(): + nonlocal id + id += 1 + return id + + now = dt_util.utcnow() + + await async_recorder_block_till_done(hass) + client = await hass_ws_client() + + zero = now + start = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=-18) + + imported_stats = [ + { + "start": (start + timedelta(hours=3 * i)), + "max": i * 2, + "mean": i, + "min": -76 + i * 2, + "sum": i, + } + for i in range(0, 6) + ] + + imported_metadata = { + "has_mean": False, + "has_sum": True, + "name": "Total imported energy", + "source": "recorder", + "statistic_id": "sensor.test", + "unit_of_measurement": "kWh", + } + + recorder.get_instance(hass).async_import_statistics( + imported_metadata, + imported_stats, + Statistics, + ) + await async_wait_recording_done(hass) + + # This should include imported_stats[:] + await client.send_json( + { + "id": next_id(), + "type": "recorder/statistic_during_period", + "statistic_id": "sensor.test", + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "max": max(stat["max"] for stat in imported_stats[:]), + "mean": fmean(stat["mean"] for stat in imported_stats[:]), + "min": min(stat["min"] for stat in imported_stats[:]), + "change": imported_stats[-1]["sum"] - imported_stats[0]["sum"], + } + + # This should also include imported_stats[:] + start_time = "2022-10-20T13:00:00+00:00" + end_time = "2022-10-21T05:00:00+00:00" + assert imported_stats[0]["start"].isoformat() == start_time + assert imported_stats[-1]["start"].isoformat() < end_time + await client.send_json( + { + "id": next_id(), + "type": "recorder/statistic_during_period", + "statistic_id": "sensor.test", + "fixed_period": { + "start_time": start_time, + "end_time": end_time, + }, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "max": max(stat["max"] for stat in imported_stats[:]), + "mean": fmean(stat["mean"] for stat in imported_stats[:]), + "min": min(stat["min"] for stat in imported_stats[:]), + "change": imported_stats[-1]["sum"] - imported_stats[0]["sum"], + } + + # This should also include imported_stats[:] + start_time = "2022-10-20T13:00:00+00:00" + end_time = "2022-10-21T08:20:00+00:00" + await client.send_json( + { + "id": next_id(), + "type": "recorder/statistic_during_period", + "statistic_id": "sensor.test", + "fixed_period": { + "start_time": start_time, + "end_time": end_time, + }, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "max": max(stat["max"] for stat in imported_stats[:]), + "mean": fmean(stat["mean"] for stat in imported_stats[:]), + "min": min(stat["min"] for stat in imported_stats[:]), + "change": imported_stats[-1]["sum"] - imported_stats[0]["sum"], + } + + # This should include imported_stats[1:4] + start_time = "2022-10-20T16:00:00+00:00" + end_time = "2022-10-20T23:00:00+00:00" + assert imported_stats[1]["start"].isoformat() == start_time + assert imported_stats[3]["start"].isoformat() < end_time + await client.send_json( + { + "id": next_id(), + "type": "recorder/statistic_during_period", + "statistic_id": "sensor.test", + "fixed_period": { + "start_time": start_time, + "end_time": end_time, + }, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "max": max(stat["max"] for stat in imported_stats[1:4]), + "mean": fmean(stat["mean"] for stat in imported_stats[1:4]), + "min": min(stat["min"] for stat in imported_stats[1:4]), + "change": imported_stats[3]["sum"] - imported_stats[1]["sum"], + } + + # This should also include imported_stats[1:4] + start_time = "2022-10-20T15:00:00+00:00" + end_time = "2022-10-21T00:00:00+00:00" + assert imported_stats[1]["start"].isoformat() > start_time + assert imported_stats[3]["start"].isoformat() < end_time + await client.send_json( + { + "id": next_id(), + "type": "recorder/statistic_during_period", + "statistic_id": "sensor.test", + "fixed_period": { + "start_time": start_time, + "end_time": end_time, + }, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["result"] == { + "max": max(stat["max"] for stat in imported_stats[1:4]), + "mean": fmean(stat["mean"] for stat in imported_stats[1:4]), + "min": min(stat["min"] for stat in imported_stats[1:4]), + "change": imported_stats[3]["sum"] - imported_stats[1]["sum"], + } + + @freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc)) @pytest.mark.parametrize( "calendar_period, start_time, end_time",