Fix statistic_during_period for data with holes (#81847)

This commit is contained in:
Erik Montnemery 2022-11-16 12:46:29 +01:00 committed by GitHub
parent a91abebea8
commit 9b8f94363c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 375 additions and 115 deletions

View File

@ -1216,11 +1216,29 @@ def _get_max_mean_min_statistic(
return result
def _first_statistic(
session: Session,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
) -> datetime | None:
"""Return the data of the oldest statistic row for a given metadata id."""
stmt = lambda_stmt(
lambda: select(table.start)
.filter(table.metadata_id == metadata_id)
.order_by(table.start.asc())
.limit(1)
)
if stats := execute_stmt_lambda_element(session, stmt):
return process_timestamp(stats[0].start) # type: ignore[no-any-return]
return None
def _get_oldest_sum_statistic(
session: Session,
head_start_time: datetime | None,
main_start_time: datetime | None,
tail_start_time: datetime | None,
oldest_stat: datetime | None,
tail_only: bool,
metadata_id: int,
) -> float | None:
@ -1231,10 +1249,10 @@ def _get_oldest_sum_statistic(
start_time: datetime | None,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
) -> tuple[float | None, datetime | None]:
) -> float | None:
"""Return the oldest non-NULL sum during the period."""
stmt = lambda_stmt(
lambda: select(table.sum, table.start)
lambda: select(table.sum)
.filter(table.metadata_id == metadata_id)
.filter(table.sum.is_not(None))
.order_by(table.start.asc())
@ -1248,49 +1266,49 @@ def _get_oldest_sum_statistic(
else:
period = start_time.replace(minute=0, second=0, microsecond=0)
prev_period = period - table.duration
stmt += lambda q: q.filter(table.start == prev_period)
stmt += lambda q: q.filter(table.start >= prev_period)
stats = execute_stmt_lambda_element(session, stmt)
return (
(stats[0].sum, process_timestamp(stats[0].start)) if stats else (None, None)
)
return stats[0].sum if stats else None
oldest_start: datetime | None
oldest_sum: float | None = None
if head_start_time is not None:
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, head_start_time, StatisticsShortTerm, metadata_id
# This function won't be called if tail_only is False and main_start_time is None
# the extra checks are added to satisfy MyPy
if not tail_only and main_start_time is not None and oldest_stat is not None:
period = main_start_time.replace(minute=0, second=0, microsecond=0)
prev_period = period - Statistics.duration
if prev_period < oldest_stat:
return 0
if (
head_start_time is not None
and (
oldest_sum := _get_oldest_sum_statistic_in_sub_period(
session, head_start_time, StatisticsShortTerm, metadata_id
)
)
if (
oldest_start is not None
and oldest_start < head_start_time
and oldest_sum is not None
):
return oldest_sum
is not None
):
return oldest_sum
if not tail_only:
assert main_start_time is not None
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, main_start_time, Statistics, metadata_id
)
if (
oldest_start is not None
and oldest_start < main_start_time
and oldest_sum is not None
):
oldest_sum := _get_oldest_sum_statistic_in_sub_period(
session, main_start_time, Statistics, metadata_id
)
) is not None:
return oldest_sum
return 0
if tail_start_time is not None:
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, tail_start_time, StatisticsShortTerm, metadata_id
if (
tail_start_time is not None
and (
oldest_sum := _get_oldest_sum_statistic_in_sub_period(
session, tail_start_time, StatisticsShortTerm, metadata_id
)
)
if (
oldest_start is not None
and oldest_start < tail_start_time
and oldest_sum is not None
):
return oldest_sum
) is not None:
return oldest_sum
return 0
@ -1373,51 +1391,79 @@ def statistic_during_period(
result: dict[str, Any] = {}
# To calculate the summary, data from the statistics (hourly) and short_term_statistics
# (5 minute) tables is combined
# - The short term statistics table is used for the head and tail of the period,
# if the period it doesn't start or end on a full hour
# - The statistics table is used for the remainder of the time
now = dt_util.utcnow()
if end_time is not None and end_time > now:
end_time = now
tail_only = (
start_time is not None
and end_time is not None
and end_time - start_time < timedelta(hours=1)
)
# Calculate the head period
head_start_time: datetime | None = None
head_end_time: datetime | None = None
if not tail_only and start_time is not None and start_time.minute:
head_start_time = start_time
head_end_time = start_time.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
# Calculate the tail period
tail_start_time: datetime | None = None
tail_end_time: datetime | None = None
if end_time is None:
tail_start_time = now.replace(minute=0, second=0, microsecond=0)
elif end_time.minute:
tail_start_time = (
start_time
if tail_only
else end_time.replace(minute=0, second=0, microsecond=0)
)
tail_end_time = end_time
# Calculate the main period
main_start_time: datetime | None = None
main_end_time: datetime | None = None
if not tail_only:
main_start_time = start_time if head_end_time is None else head_end_time
main_end_time = end_time if tail_start_time is None else tail_start_time
with session_scope(hass=hass) as session:
# Fetch metadata for the given statistic_id
if not (
metadata := get_metadata_with_session(session, statistic_ids=[statistic_id])
):
return result
metadata_id = metadata[statistic_id][0]
oldest_stat = _first_statistic(session, Statistics, metadata_id)
oldest_5_min_stat = None
if not valid_statistic_id(statistic_id):
oldest_5_min_stat = _first_statistic(
session, StatisticsShortTerm, metadata_id
)
# To calculate the summary, data from the statistics (hourly) and
# short_term_statistics (5 minute) tables is combined
# - The short term statistics table is used for the head and tail of the period,
# if the period it doesn't start or end on a full hour
# - The statistics table is used for the remainder of the time
now = dt_util.utcnow()
if end_time is not None and end_time > now:
end_time = now
tail_only = (
start_time is not None
and end_time is not None
and end_time - start_time < timedelta(hours=1)
)
# Calculate the head period
head_start_time: datetime | None = None
head_end_time: datetime | None = None
if (
not tail_only
and oldest_stat is not None
and oldest_5_min_stat is not None
and oldest_5_min_stat - oldest_stat < timedelta(hours=1)
and (start_time is None or start_time < oldest_5_min_stat)
):
# To improve accuracy of averaged for statistics which were added within
# recorder's retention period.
head_start_time = oldest_5_min_stat
head_end_time = oldest_5_min_stat.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
elif not tail_only and start_time is not None and start_time.minute:
head_start_time = start_time
head_end_time = start_time.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
# Calculate the tail period
tail_start_time: datetime | None = None
tail_end_time: datetime | None = None
if end_time is None:
tail_start_time = now.replace(minute=0, second=0, microsecond=0)
elif end_time.minute:
tail_start_time = (
start_time
if tail_only
else end_time.replace(minute=0, second=0, microsecond=0)
)
tail_end_time = end_time
# Calculate the main period
main_start_time: datetime | None = None
main_end_time: datetime | None = None
if not tail_only:
main_start_time = start_time if head_end_time is None else head_end_time
main_end_time = end_time if tail_start_time is None else tail_start_time
# Fetch metadata for the given statistic_id
metadata = get_metadata_with_session(session, statistic_ids=[statistic_id])
if not metadata:
@ -1449,6 +1495,7 @@ def statistic_during_period(
head_start_time,
main_start_time,
tail_start_time,
oldest_stat,
tail_only,
metadata_id,
)

View File

@ -182,7 +182,8 @@ async def test_statistics_during_period(recorder_mock, hass, hass_ws_client):
@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc))
async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
@pytest.mark.parametrize("offset", (0, 1, 2))
async def test_statistic_during_period(recorder_mock, hass, hass_ws_client, offset):
"""Test statistic_during_period."""
id = 1
@ -197,7 +198,9 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
client = await hass_ws_client()
zero = now
start = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=-3)
start = zero.replace(minute=offset * 5, second=0, microsecond=0) + timedelta(
hours=-3
)
imported_stats_5min = [
{
@ -209,22 +212,37 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
for i in range(0, 39)
]
imported_stats = [
imported_stats = []
slice_end = 12 - offset
imported_stats.append(
{
"start": imported_stats_5min[i * 12]["start"],
"max": max(
stat["max"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"mean": fmean(
stat["mean"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"min": min(
stat["min"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"sum": imported_stats_5min[i * 12 + 11]["sum"],
"start": imported_stats_5min[0]["start"].replace(minute=0),
"max": max(stat["max"] for stat in imported_stats_5min[0:slice_end]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[0:slice_end]),
"min": min(stat["min"] for stat in imported_stats_5min[0:slice_end]),
"sum": imported_stats_5min[slice_end - 1]["sum"],
}
for i in range(0, 3)
]
)
for i in range(0, 2):
slice_start = i * 12 + (12 - offset)
slice_end = (i + 1) * 12 + (12 - offset)
assert imported_stats_5min[slice_start]["start"].minute == 0
imported_stats.append(
{
"start": imported_stats_5min[slice_start]["start"],
"max": max(
stat["max"] for stat in imported_stats_5min[slice_start:slice_end]
),
"mean": fmean(
stat["mean"] for stat in imported_stats_5min[slice_start:slice_end]
),
"min": min(
stat["min"] for stat in imported_stats_5min[slice_start:slice_end]
),
"sum": imported_stats_5min[slice_end - 1]["sum"],
}
)
imported_metadata = {
"has_mean": False,
"has_sum": True,
@ -285,8 +303,14 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should also include imported_statistics_5min[:]
start_time = "2022-10-21T04:00:00+00:00"
end_time = "2022-10-21T07:15:00+00:00"
start_time = (
dt_util.parse_datetime("2022-10-21T04:00:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
end_time = (
dt_util.parse_datetime("2022-10-21T07:15:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
await client.send_json(
{
"id": next_id(),
@ -308,8 +332,14 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should also include imported_statistics_5min[:]
start_time = "2022-10-20T04:00:00+00:00"
end_time = "2022-10-21T08:20:00+00:00"
start_time = (
dt_util.parse_datetime("2022-10-21T04:00:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
end_time = (
dt_util.parse_datetime("2022-10-21T08:20:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
await client.send_json(
{
"id": next_id(),
@ -331,7 +361,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should include imported_statistics_5min[26:]
start_time = "2022-10-21T06:10:00+00:00"
start_time = (
dt_util.parse_datetime("2022-10-21T06:10:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
assert imported_stats_5min[26]["start"].isoformat() == start_time
await client.send_json(
{
@ -353,7 +386,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should also include imported_statistics_5min[26:]
start_time = "2022-10-21T06:09:00+00:00"
start_time = (
dt_util.parse_datetime("2022-10-21T06:09:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
await client.send_json(
{
"id": next_id(),
@ -374,7 +410,10 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should include imported_statistics_5min[:26]
end_time = "2022-10-21T06:10:00+00:00"
end_time = (
dt_util.parse_datetime("2022-10-21T06:10:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
assert imported_stats_5min[26]["start"].isoformat() == end_time
await client.send_json(
{
@ -396,9 +435,15 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
# This should include imported_statistics_5min[26:32] (less than a full hour)
start_time = "2022-10-21T06:10:00+00:00"
start_time = (
dt_util.parse_datetime("2022-10-21T06:10:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
assert imported_stats_5min[26]["start"].isoformat() == start_time
end_time = "2022-10-21T06:40:00+00:00"
end_time = (
dt_util.parse_datetime("2022-10-21T06:40:00+00:00")
+ timedelta(minutes=5 * offset)
).isoformat()
assert imported_stats_5min[32]["start"].isoformat() == end_time
await client.send_json(
{
@ -422,7 +467,7 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
# This should include imported_statistics[2:] + imported_statistics_5min[36:]
start_time = "2022-10-21T06:00:00+00:00"
assert imported_stats_5min[24]["start"].isoformat() == start_time
assert imported_stats_5min[24 - offset]["start"].isoformat() == start_time
assert imported_stats[2]["start"].isoformat() == start_time
await client.send_json(
{
@ -437,10 +482,11 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]),
"min": min(stat["min"] for stat in imported_stats_5min[24:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"],
"max": max(stat["max"] for stat in imported_stats_5min[24 - offset :]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24 - offset :]),
"min": min(stat["min"] for stat in imported_stats_5min[24 - offset :]),
"change": imported_stats_5min[-1]["sum"]
- imported_stats_5min[23 - offset]["sum"],
}
# This should also include imported_statistics[2:] + imported_statistics_5min[36:]
@ -457,10 +503,11 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]),
"min": min(stat["min"] for stat in imported_stats_5min[24:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"],
"max": max(stat["max"] for stat in imported_stats_5min[24 - offset :]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24 - offset :]),
"min": min(stat["min"] for stat in imported_stats_5min[24 - offset :]),
"change": imported_stats_5min[-1]["sum"]
- imported_stats_5min[23 - offset]["sum"],
}
# This should include imported_statistics[2:3]
@ -477,11 +524,16 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
)
response = await client.receive_json()
assert response["success"]
slice_start = 24 - offset
slice_end = 36 - offset
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:36]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:36]),
"min": min(stat["min"] for stat in imported_stats_5min[24:36]),
"change": imported_stats_5min[35]["sum"] - imported_stats_5min[23]["sum"],
"max": max(stat["max"] for stat in imported_stats_5min[slice_start:slice_end]),
"mean": fmean(
stat["mean"] for stat in imported_stats_5min[slice_start:slice_end]
),
"min": min(stat["min"] for stat in imported_stats_5min[slice_start:slice_end]),
"change": imported_stats_5min[slice_end - 1]["sum"]
- imported_stats_5min[slice_start - 1]["sum"],
}
# Test we can get only selected types
@ -539,6 +591,167 @@ async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
}
@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc))
async def test_statistic_during_period_hole(recorder_mock, hass, hass_ws_client):
"""Test statistic_during_period when there are holes in the data."""
id = 1
def next_id():
nonlocal id
id += 1
return id
now = dt_util.utcnow()
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
zero = now
start = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=-18)
imported_stats = [
{
"start": (start + timedelta(hours=3 * i)),
"max": i * 2,
"mean": i,
"min": -76 + i * 2,
"sum": i,
}
for i in range(0, 6)
]
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "recorder",
"statistic_id": "sensor.test",
"unit_of_measurement": "kWh",
}
recorder.get_instance(hass).async_import_statistics(
imported_metadata,
imported_stats,
Statistics,
)
await async_wait_recording_done(hass)
# This should include imported_stats[:]
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats[:]),
"mean": fmean(stat["mean"] for stat in imported_stats[:]),
"min": min(stat["min"] for stat in imported_stats[:]),
"change": imported_stats[-1]["sum"] - imported_stats[0]["sum"],
}
# This should also include imported_stats[:]
start_time = "2022-10-20T13:00:00+00:00"
end_time = "2022-10-21T05:00:00+00:00"
assert imported_stats[0]["start"].isoformat() == start_time
assert imported_stats[-1]["start"].isoformat() < end_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats[:]),
"mean": fmean(stat["mean"] for stat in imported_stats[:]),
"min": min(stat["min"] for stat in imported_stats[:]),
"change": imported_stats[-1]["sum"] - imported_stats[0]["sum"],
}
# This should also include imported_stats[:]
start_time = "2022-10-20T13:00:00+00:00"
end_time = "2022-10-21T08:20:00+00:00"
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats[:]),
"mean": fmean(stat["mean"] for stat in imported_stats[:]),
"min": min(stat["min"] for stat in imported_stats[:]),
"change": imported_stats[-1]["sum"] - imported_stats[0]["sum"],
}
# This should include imported_stats[1:4]
start_time = "2022-10-20T16:00:00+00:00"
end_time = "2022-10-20T23:00:00+00:00"
assert imported_stats[1]["start"].isoformat() == start_time
assert imported_stats[3]["start"].isoformat() < end_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats[1:4]),
"mean": fmean(stat["mean"] for stat in imported_stats[1:4]),
"min": min(stat["min"] for stat in imported_stats[1:4]),
"change": imported_stats[3]["sum"] - imported_stats[1]["sum"],
}
# This should also include imported_stats[1:4]
start_time = "2022-10-20T15:00:00+00:00"
end_time = "2022-10-21T00:00:00+00:00"
assert imported_stats[1]["start"].isoformat() > start_time
assert imported_stats[3]["start"].isoformat() < end_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats[1:4]),
"mean": fmean(stat["mean"] for stat in imported_stats[1:4]),
"min": min(stat["min"] for stat in imported_stats[1:4]),
"change": imported_stats[3]["sum"] - imported_stats[1]["sum"],
}
@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc))
@pytest.mark.parametrize(
"calendar_period, start_time, end_time",