Add WS API recorder/statistic_during_period (#80663)

This commit is contained in:
Erik Montnemery 2022-10-27 21:51:09 +02:00 committed by GitHub
parent 588277623f
commit 68346599d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 987 additions and 25 deletions

View File

@ -1113,6 +1113,377 @@ def _statistics_during_period_stmt_short_term(
return stmt
def _get_max_mean_min_statistic_in_sub_period(
session: Session,
result: dict[str, float],
start_time: datetime | None,
end_time: datetime | None,
table: type[Statistics | StatisticsShortTerm],
types: set[str],
metadata_id: int,
) -> None:
"""Return max, mean and min during the period."""
# Calculate max, mean, min
columns = []
if "max" in types:
columns.append(func.max(table.max))
if "mean" in types:
columns.append(func.avg(table.mean))
columns.append(func.count(table.mean))
if "min" in types:
columns.append(func.min(table.min))
stmt = lambda_stmt(lambda: select(columns).filter(table.metadata_id == metadata_id))
if start_time is not None:
stmt += lambda q: q.filter(table.start >= start_time)
if end_time is not None:
stmt += lambda q: q.filter(table.start < end_time)
stats = execute_stmt_lambda_element(session, stmt)
if "max" in types and stats and (new_max := stats[0].max) is not None:
old_max = result.get("max")
result["max"] = max(new_max, old_max) if old_max is not None else new_max
if "mean" in types and stats and stats[0].avg is not None:
duration = stats[0].count * table.duration.total_seconds()
result["duration"] = result.get("duration", 0.0) + duration
result["mean_acc"] = result.get("mean_acc", 0.0) + stats[0].avg * duration
if "min" in types and stats and (new_min := stats[0].min) is not None:
old_min = result.get("min")
result["min"] = min(new_min, old_min) if old_min is not None else new_min
def _get_max_mean_min_statistic(
session: Session,
head_start_time: datetime | None,
head_end_time: datetime | None,
main_start_time: datetime | None,
main_end_time: datetime | None,
tail_start_time: datetime | None,
tail_end_time: datetime | None,
tail_only: bool,
metadata_id: int,
types: set[str],
) -> dict[str, float | None]:
"""Return max, mean and min during the period.
The mean is a time weighted average, combining hourly and 5-minute statistics if
necessary.
"""
max_mean_min: dict[str, float] = {}
result: dict[str, float | None] = {}
if tail_start_time is not None:
# Calculate max, mean, min
_get_max_mean_min_statistic_in_sub_period(
session,
max_mean_min,
tail_start_time,
tail_end_time,
StatisticsShortTerm,
types,
metadata_id,
)
if not tail_only:
_get_max_mean_min_statistic_in_sub_period(
session,
max_mean_min,
main_start_time,
main_end_time,
Statistics,
types,
metadata_id,
)
if head_start_time is not None:
_get_max_mean_min_statistic_in_sub_period(
session,
max_mean_min,
head_start_time,
head_end_time,
StatisticsShortTerm,
types,
metadata_id,
)
if "max" in types:
result["max"] = max_mean_min.get("max")
if "mean" in types:
if "mean_acc" not in max_mean_min:
result["mean"] = None
else:
result["mean"] = max_mean_min["mean_acc"] / max_mean_min["duration"]
if "min" in types:
result["min"] = max_mean_min.get("min")
return result
def _get_oldest_sum_statistic(
session: Session,
head_start_time: datetime | None,
main_start_time: datetime | None,
tail_start_time: datetime | None,
tail_only: bool,
metadata_id: int,
) -> float | None:
"""Return the oldest non-NULL sum during the period."""
def _get_oldest_sum_statistic_in_sub_period(
session: Session,
start_time: datetime | None,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
) -> tuple[float | None, datetime | None]:
"""Return the oldest non-NULL sum during the period."""
stmt = lambda_stmt(
lambda: select(table.sum, table.start)
.filter(table.metadata_id == metadata_id)
.filter(table.sum.is_not(None))
.order_by(table.start.asc())
.limit(1)
)
if start_time is not None:
start_time = start_time + table.duration - timedelta.resolution
if table == StatisticsShortTerm:
minutes = start_time.minute - start_time.minute % 5
period = start_time.replace(minute=minutes, second=0, microsecond=0)
else:
period = start_time.replace(minute=0, second=0, microsecond=0)
prev_period = period - table.duration
stmt += lambda q: q.filter(table.start == prev_period)
stats = execute_stmt_lambda_element(session, stmt)
return (
(stats[0].sum, process_timestamp(stats[0].start)) if stats else (None, None)
)
oldest_start: datetime | None
oldest_sum: float | None = None
if head_start_time is not None:
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, head_start_time, StatisticsShortTerm, metadata_id
)
if (
oldest_start is not None
and oldest_start < head_start_time
and oldest_sum is not None
):
return oldest_sum
if not tail_only:
assert main_start_time is not None
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, main_start_time, Statistics, metadata_id
)
if (
oldest_start is not None
and oldest_start < main_start_time
and oldest_sum is not None
):
return oldest_sum
return 0
if tail_start_time is not None:
oldest_sum, oldest_start = _get_oldest_sum_statistic_in_sub_period(
session, tail_start_time, StatisticsShortTerm, metadata_id
)
if (
oldest_start is not None
and oldest_start < tail_start_time
and oldest_sum is not None
):
return oldest_sum
return 0
def _get_newest_sum_statistic(
session: Session,
head_start_time: datetime | None,
head_end_time: datetime | None,
main_start_time: datetime | None,
main_end_time: datetime | None,
tail_start_time: datetime | None,
tail_end_time: datetime | None,
tail_only: bool,
metadata_id: int,
) -> float | None:
"""Return the newest non-NULL sum during the period."""
def _get_newest_sum_statistic_in_sub_period(
session: Session,
start_time: datetime | None,
end_time: datetime | None,
table: type[Statistics | StatisticsShortTerm],
metadata_id: int,
) -> float | None:
"""Return the newest non-NULL sum during the period."""
stmt = lambda_stmt(
lambda: select(
table.sum,
)
.filter(table.metadata_id == metadata_id)
.filter(table.sum.is_not(None))
.order_by(table.start.desc())
.limit(1)
)
if start_time is not None:
stmt += lambda q: q.filter(table.start >= start_time)
if end_time is not None:
stmt += lambda q: q.filter(table.start < end_time)
stats = execute_stmt_lambda_element(session, stmt)
return stats[0].sum if stats else None
newest_sum: float | None = None
if tail_start_time is not None:
newest_sum = _get_newest_sum_statistic_in_sub_period(
session, tail_start_time, tail_end_time, StatisticsShortTerm, metadata_id
)
if newest_sum is not None:
return newest_sum
if not tail_only:
newest_sum = _get_newest_sum_statistic_in_sub_period(
session, main_start_time, main_end_time, Statistics, metadata_id
)
if newest_sum is not None:
return newest_sum
if head_start_time is not None:
newest_sum = _get_newest_sum_statistic_in_sub_period(
session, head_start_time, head_end_time, StatisticsShortTerm, metadata_id
)
return newest_sum
def statistic_during_period(
hass: HomeAssistant,
start_time: datetime | None,
end_time: datetime | None,
statistic_id: str,
types: set[str] | None,
units: dict[str, str] | None,
) -> dict[str, Any]:
"""Return a statistic data point for the UTC period start_time - end_time."""
metadata = None
if not types:
types = {"max", "mean", "min", "change"}
result: dict[str, Any] = {}
# To calculate the summary, data from the statistics (hourly) and short_term_statistics
# (5 minute) tables is combined
# - The short term statistics table is used for the head and tail of the period,
# if the period it doesn't start or end on a full hour
# - The statistics table is used for the remainder of the time
now = dt_util.utcnow()
if end_time is not None and end_time > now:
end_time = now
tail_only = (
start_time is not None
and end_time is not None
and end_time - start_time < timedelta(hours=1)
)
# Calculate the head period
head_start_time: datetime | None = None
head_end_time: datetime | None = None
if not tail_only and start_time is not None and start_time.minute:
head_start_time = start_time
head_end_time = start_time.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
# Calculate the tail period
tail_start_time: datetime | None = None
tail_end_time: datetime | None = None
if end_time is None:
tail_start_time = now.replace(minute=0, second=0, microsecond=0)
elif end_time.minute:
tail_start_time = (
start_time
if tail_only
else end_time.replace(minute=0, second=0, microsecond=0)
)
tail_end_time = end_time
# Calculate the main period
main_start_time: datetime | None = None
main_end_time: datetime | None = None
if not tail_only:
main_start_time = start_time if head_end_time is None else head_end_time
main_end_time = end_time if tail_start_time is None else tail_start_time
with session_scope(hass=hass) as session:
# Fetch metadata for the given statistic_id
metadata = get_metadata_with_session(session, statistic_ids=[statistic_id])
if not metadata:
return result
metadata_id = metadata[statistic_id][0]
if not types.isdisjoint({"max", "mean", "min"}):
result = _get_max_mean_min_statistic(
session,
head_start_time,
head_end_time,
main_start_time,
main_end_time,
tail_start_time,
tail_end_time,
tail_only,
metadata_id,
types,
)
if "change" in types:
oldest_sum: float | None
if start_time is None:
oldest_sum = 0.0
else:
oldest_sum = _get_oldest_sum_statistic(
session,
head_start_time,
main_start_time,
tail_start_time,
tail_only,
metadata_id,
)
newest_sum = _get_newest_sum_statistic(
session,
head_start_time,
head_end_time,
main_start_time,
main_end_time,
tail_start_time,
tail_end_time,
tail_only,
metadata_id,
)
# Calculate the difference between the oldest and newest sum
if oldest_sum is not None and newest_sum is not None:
result["change"] = newest_sum - oldest_sum
else:
result["change"] = None
def no_conversion(val: float | None) -> float | None:
"""Return val."""
return val
state_unit = unit = metadata[statistic_id][1]["unit_of_measurement"]
if state := hass.states.get(statistic_id):
state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
if unit is not None:
convert = _get_statistic_to_display_unit_converter(unit, state_unit, units)
else:
convert = no_conversion
return {key: convert(value) for key, value in result.items()}
def statistics_during_period(
hass: HomeAssistant,
start_time: datetime,
@ -1122,7 +1493,7 @@ def statistics_during_period(
start_time_as_datetime: bool = False,
units: dict[str, str] | None = None,
) -> dict[str, list[dict[str, Any]]]:
"""Return statistics during UTC period start_time - end_time for the statistic_ids.
"""Return statistic data points during UTC period start_time - end_time.
If end_time is omitted, returns statistics newer than or equal to start_time.
If statistic_ids is omitted, returns statistics for all statistics ids.

View File

@ -1,7 +1,7 @@
"""The Recorder websocket API."""
from __future__ import annotations
from datetime import datetime as dt
from datetime import datetime as dt, timedelta
import logging
from typing import Any, Literal
@ -10,6 +10,7 @@ import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import messages
from homeassistant.core import HomeAssistant, callback, valid_entity_id
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.json import JSON_DUMP
from homeassistant.util import dt as dt_util
@ -31,6 +32,7 @@ from .statistics import (
async_change_statistics_unit,
async_import_statistics,
list_statistic_ids,
statistic_during_period,
statistics_during_period,
validate_statistics,
)
@ -47,6 +49,7 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_backup_start)
websocket_api.async_register_command(hass, ws_change_statistics_unit)
websocket_api.async_register_command(hass, ws_clear_statistics)
websocket_api.async_register_command(hass, ws_get_statistic_during_period)
websocket_api.async_register_command(hass, ws_get_statistics_during_period)
websocket_api.async_register_command(hass, ws_get_statistics_metadata)
websocket_api.async_register_command(hass, ws_list_statistic_ids)
@ -56,6 +59,149 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_validate_statistics)
def _ws_get_statistic_during_period(
hass: HomeAssistant,
msg_id: int,
start_time: dt | None,
end_time: dt | None,
statistic_id: str,
types: set[str] | None,
units: dict[str, str],
) -> str:
"""Fetch statistics and convert them to json in the executor."""
return JSON_DUMP(
messages.result_message(
msg_id,
statistic_during_period(
hass, start_time, end_time, statistic_id, types, units=units
),
)
)
@websocket_api.websocket_command(
{
vol.Required("type"): "recorder/statistic_during_period",
vol.Exclusive("calendar", "period"): vol.Schema(
{
vol.Required("period"): vol.Any("hour", "day", "week", "month", "year"),
vol.Optional("offset"): int,
}
),
vol.Exclusive("fixed_period", "period"): vol.Schema(
{
vol.Optional("start_time"): str,
vol.Optional("end_time"): str,
}
),
vol.Exclusive("rolling_window", "period"): vol.Schema(
{
vol.Required("duration"): cv.time_period_dict,
vol.Optional("offset"): cv.time_period_dict,
}
),
vol.Optional("statistic_id"): str,
vol.Optional("types"): vol.All([str], vol.Coerce(set)),
vol.Optional("units"): vol.Schema(
{
vol.Optional("distance"): vol.In(DistanceConverter.VALID_UNITS),
vol.Optional("energy"): vol.In(EnergyConverter.VALID_UNITS),
vol.Optional("mass"): vol.In(MassConverter.VALID_UNITS),
vol.Optional("power"): vol.In(PowerConverter.VALID_UNITS),
vol.Optional("pressure"): vol.In(PressureConverter.VALID_UNITS),
vol.Optional("speed"): vol.In(SpeedConverter.VALID_UNITS),
vol.Optional("temperature"): vol.In(TemperatureConverter.VALID_UNITS),
vol.Optional("volume"): vol.In(VolumeConverter.VALID_UNITS),
}
),
}
)
@websocket_api.async_response
async def ws_get_statistic_during_period(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any]
) -> None:
"""Handle statistics websocket command."""
if ("start_time" in msg or "end_time" in msg) and "duration" in msg:
raise HomeAssistantError
if "offset" in msg and "duration" not in msg:
raise HomeAssistantError
start_time = None
end_time = None
if "calendar" in msg:
calendar_period = msg["calendar"]["period"]
start_of_day = dt_util.start_of_local_day()
offset = msg["calendar"].get("offset", 0)
if calendar_period == "hour":
start_time = dt_util.now().replace(minute=0, second=0, microsecond=0)
start_time += timedelta(hours=offset)
end_time = start_time + timedelta(hours=1)
elif calendar_period == "day":
start_time = start_of_day
start_time += timedelta(days=offset)
end_time = start_time + timedelta(days=1)
elif calendar_period == "week":
start_time = start_of_day - timedelta(days=start_of_day.weekday())
start_time += timedelta(days=offset * 7)
end_time = start_time + timedelta(weeks=1)
elif calendar_period == "month":
start_time = start_of_day.replace(day=28)
# This works for up to 48 months of offset
start_time = (start_time + timedelta(days=offset * 31)).replace(day=1)
end_time = (start_time + timedelta(days=31)).replace(day=1)
else: # calendar_period = "year"
start_time = start_of_day.replace(month=12, day=31)
# This works for 100+ years of offset
start_time = (start_time + timedelta(days=offset * 366)).replace(
month=1, day=1
)
end_time = (start_time + timedelta(days=365)).replace(day=1)
start_time = dt_util.as_utc(start_time)
end_time = dt_util.as_utc(end_time)
elif "fixed_period" in msg:
if start_time_str := msg["fixed_period"].get("start_time"):
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
else:
connection.send_error(
msg["id"], "invalid_start_time", "Invalid start_time"
)
return
if end_time_str := msg["fixed_period"].get("end_time"):
if end_time := dt_util.parse_datetime(end_time_str):
end_time = dt_util.as_utc(end_time)
else:
connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
return
elif "rolling_window" in msg:
duration = msg["rolling_window"]["duration"]
now = dt_util.utcnow()
start_time = now - duration
end_time = start_time + duration
if offset := msg["rolling_window"].get("offset"):
start_time += offset
end_time += offset
connection.send_message(
await get_instance(hass).async_add_executor_job(
_ws_get_statistic_during_period,
hass,
msg["id"],
start_time,
end_time,
msg.get("statistic_id"),
msg.get("types"),
msg.get("units"),
)
)
def _ws_get_statistics_during_period(
hass: HomeAssistant,
msg_id: int,

View File

@ -1,14 +1,17 @@
"""The tests for sensor recorder platform."""
# pylint: disable=protected-access,invalid-name
import datetime
from datetime import timedelta
from statistics import fmean
import threading
from unittest.mock import patch
from unittest.mock import ANY, patch
from freezegun import freeze_time
import pytest
from pytest import approx
from homeassistant.components import recorder
from homeassistant.components.recorder.db_schema import Statistics, StatisticsShortTerm
from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
@ -178,6 +181,448 @@ async def test_statistics_during_period(recorder_mock, hass, hass_ws_client):
}
@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc))
async def test_statistic_during_period(recorder_mock, hass, hass_ws_client):
"""Test statistic_during_period."""
id = 1
def next_id():
nonlocal id
id += 1
return id
now = dt_util.utcnow()
await async_recorder_block_till_done(hass)
client = await hass_ws_client()
zero = now
start = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=-3)
imported_stats_5min = [
{
"start": (start + timedelta(minutes=5 * i)),
"max": i * 2,
"mean": i,
"min": -76 + i * 2,
"sum": i,
}
for i in range(0, 39)
]
imported_stats = [
{
"start": imported_stats_5min[i * 12]["start"],
"max": max(
stat["max"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"mean": fmean(
stat["mean"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"min": min(
stat["min"] for stat in imported_stats_5min[i * 12 : (i + 1) * 12]
),
"sum": imported_stats_5min[i * 12 + 11]["sum"],
}
for i in range(0, 3)
]
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "recorder",
"statistic_id": "sensor.test",
"unit_of_measurement": "kWh",
}
recorder.get_instance(hass).async_import_statistics(
imported_metadata,
imported_stats,
Statistics,
)
recorder.get_instance(hass).async_import_statistics(
imported_metadata,
imported_stats_5min,
StatisticsShortTerm,
)
await async_wait_recording_done(hass)
# No data for this period yet
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"start_time": now.isoformat(),
"end_time": now.isoformat(),
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": None,
"mean": None,
"min": None,
"change": None,
}
# This should include imported_statistics_5min[:]
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:]),
"min": min(stat["min"] for stat in imported_stats_5min[:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"],
}
# This should also include imported_statistics_5min[:]
start_time = "2022-10-21T04:00:00+00:00"
end_time = "2022-10-21T07:15:00+00:00"
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:]),
"min": min(stat["min"] for stat in imported_stats_5min[:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"],
}
# This should also include imported_statistics_5min[:]
start_time = "2022-10-20T04:00:00+00:00"
end_time = "2022-10-21T08:20:00+00:00"
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:]),
"min": min(stat["min"] for stat in imported_stats_5min[:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"],
}
# This should include imported_statistics_5min[26:]
start_time = "2022-10-21T06:10:00+00:00"
assert imported_stats_5min[26]["start"].isoformat() == start_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"start_time": start_time,
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[26:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[26:]),
"min": min(stat["min"] for stat in imported_stats_5min[26:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[25]["sum"],
}
# This should also include imported_statistics_5min[26:]
start_time = "2022-10-21T06:09:00+00:00"
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"start_time": start_time,
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[26:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[26:]),
"min": min(stat["min"] for stat in imported_stats_5min[26:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[25]["sum"],
}
# This should include imported_statistics_5min[:26]
end_time = "2022-10-21T06:10:00+00:00"
assert imported_stats_5min[26]["start"].isoformat() == end_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"end_time": end_time,
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:26]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:26]),
"min": min(stat["min"] for stat in imported_stats_5min[:26]),
"change": imported_stats_5min[25]["sum"] - 0,
}
# This should include imported_statistics_5min[26:32] (less than a full hour)
start_time = "2022-10-21T06:10:00+00:00"
assert imported_stats_5min[26]["start"].isoformat() == start_time
end_time = "2022-10-21T06:40:00+00:00"
assert imported_stats_5min[32]["start"].isoformat() == end_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"start_time": start_time,
"end_time": end_time,
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[26:32]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[26:32]),
"min": min(stat["min"] for stat in imported_stats_5min[26:32]),
"change": imported_stats_5min[31]["sum"] - imported_stats_5min[25]["sum"],
}
# This should include imported_statistics[2:] + imported_statistics_5min[36:]
start_time = "2022-10-21T06:00:00+00:00"
assert imported_stats_5min[24]["start"].isoformat() == start_time
assert imported_stats[2]["start"].isoformat() == start_time
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"fixed_period": {
"start_time": start_time,
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]),
"min": min(stat["min"] for stat in imported_stats_5min[24:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"],
}
# This should also include imported_statistics[2:] + imported_statistics_5min[36:]
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"rolling_window": {
"duration": {"hours": 1, "minutes": 25},
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:]),
"min": min(stat["min"] for stat in imported_stats_5min[24:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[23]["sum"],
}
# This should include imported_statistics[2:3]
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"rolling_window": {
"duration": {"hours": 1},
"offset": {"minutes": -25},
},
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[24:36]),
"mean": fmean(stat["mean"] for stat in imported_stats_5min[24:36]),
"min": min(stat["min"] for stat in imported_stats_5min[24:36]),
"change": imported_stats_5min[35]["sum"] - imported_stats_5min[23]["sum"],
}
# Test we can get only selected types
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"types": ["max", "change"],
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]),
"change": imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"],
}
# Test we can convert units
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
"units": {"energy": "MWh"},
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]) / 1000,
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:]) / 1000,
"min": min(stat["min"] for stat in imported_stats_5min[:]) / 1000,
"change": (imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"])
/ 1000,
}
# Test we can automatically convert units
hass.states.async_set("sensor.test", None, attributes=ENERGY_SENSOR_WH_ATTRIBUTES)
await client.send_json(
{
"id": next_id(),
"type": "recorder/statistic_during_period",
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {
"max": max(stat["max"] for stat in imported_stats_5min[:]) * 1000,
"mean": fmean(stat["mean"] for stat in imported_stats_5min[:]) * 1000,
"min": min(stat["min"] for stat in imported_stats_5min[:]) * 1000,
"change": (imported_stats_5min[-1]["sum"] - imported_stats_5min[0]["sum"])
* 1000,
}
@freeze_time(datetime.datetime(2022, 10, 21, 7, 25, tzinfo=datetime.timezone.utc))
@pytest.mark.parametrize(
"calendar_period, start_time, end_time",
(
(
{"period": "hour"},
"2022-10-21T07:00:00+00:00",
"2022-10-21T08:00:00+00:00",
),
(
{"period": "hour", "offset": -1},
"2022-10-21T06:00:00+00:00",
"2022-10-21T07:00:00+00:00",
),
(
{"period": "day"},
"2022-10-21T07:00:00+00:00",
"2022-10-22T07:00:00+00:00",
),
(
{"period": "day", "offset": -1},
"2022-10-20T07:00:00+00:00",
"2022-10-21T07:00:00+00:00",
),
(
{"period": "week"},
"2022-10-17T07:00:00+00:00",
"2022-10-24T07:00:00+00:00",
),
(
{"period": "week", "offset": -1},
"2022-10-10T07:00:00+00:00",
"2022-10-17T07:00:00+00:00",
),
(
{"period": "month"},
"2022-10-01T07:00:00+00:00",
"2022-11-01T07:00:00+00:00",
),
(
{"period": "month", "offset": -1},
"2022-09-01T07:00:00+00:00",
"2022-10-01T07:00:00+00:00",
),
(
{"period": "year"},
"2022-01-01T08:00:00+00:00",
"2023-01-01T08:00:00+00:00",
),
(
{"period": "year", "offset": -1},
"2021-01-01T08:00:00+00:00",
"2022-01-01T08:00:00+00:00",
),
),
)
async def test_statistic_during_period_calendar(
recorder_mock, hass, hass_ws_client, calendar_period, start_time, end_time
):
"""Test statistic_during_period."""
client = await hass_ws_client()
# Try requesting data for the current hour
with patch(
"homeassistant.components.recorder.websocket_api.statistic_during_period",
return_value={},
) as statistic_during_period:
await client.send_json(
{
"id": 1,
"type": "recorder/statistic_during_period",
"calendar": calendar_period,
"statistic_id": "sensor.test",
}
)
response = await client.receive_json()
statistic_during_period.assert_called_once_with(
hass, ANY, ANY, "sensor.test", None, units=None
)
assert statistic_during_period.call_args[0][1].isoformat() == start_time
assert statistic_during_period.call_args[0][2].isoformat() == end_time
assert response["success"]
@pytest.mark.parametrize(
"attributes, state, value, custom_units, converted_value",
[
@ -1595,20 +2040,20 @@ async def test_import_statistics(
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = {
imported_statistics1 = {
"start": period1.isoformat(),
"last_reset": None,
"state": 0,
"sum": 2,
}
external_statistics2 = {
imported_statistics2 = {
"start": period2.isoformat(),
"last_reset": None,
"state": 1,
"sum": 3,
}
external_metadata = {
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
@ -1621,8 +2066,8 @@ async def test_import_statistics(
{
"id": 1,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics1, external_statistics2],
"metadata": imported_metadata,
"stats": [imported_statistics1, imported_statistics2],
}
)
response = await client.receive_json()
@ -1712,7 +2157,7 @@ async def test_import_statistics(
{
"id": 2,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"metadata": imported_metadata,
"stats": [external_statistics],
}
)
@ -1764,7 +2209,7 @@ async def test_import_statistics(
{
"id": 3,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"metadata": imported_metadata,
"stats": [external_statistics],
}
)
@ -1822,20 +2267,20 @@ async def test_adjust_sum_statistics_energy(
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = {
imported_statistics1 = {
"start": period1.isoformat(),
"last_reset": None,
"state": 0,
"sum": 2,
}
external_statistics2 = {
imported_statistics2 = {
"start": period2.isoformat(),
"last_reset": None,
"state": 1,
"sum": 3,
}
external_metadata = {
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
@ -1848,8 +2293,8 @@ async def test_adjust_sum_statistics_energy(
{
"id": 1,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics1, external_statistics2],
"metadata": imported_metadata,
"stats": [imported_statistics1, imported_statistics2],
}
)
response = await client.receive_json()
@ -2018,20 +2463,20 @@ async def test_adjust_sum_statistics_gas(
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = {
imported_statistics1 = {
"start": period1.isoformat(),
"last_reset": None,
"state": 0,
"sum": 2,
}
external_statistics2 = {
imported_statistics2 = {
"start": period2.isoformat(),
"last_reset": None,
"state": 1,
"sum": 3,
}
external_metadata = {
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
@ -2044,8 +2489,8 @@ async def test_adjust_sum_statistics_gas(
{
"id": 1,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics1, external_statistics2],
"metadata": imported_metadata,
"stats": [imported_statistics1, imported_statistics2],
}
)
response = await client.receive_json()
@ -2229,20 +2674,20 @@ async def test_adjust_sum_statistics_errors(
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = {
imported_statistics1 = {
"start": period1.isoformat(),
"last_reset": None,
"state": 0,
"sum": 2,
}
external_statistics2 = {
imported_statistics2 = {
"start": period2.isoformat(),
"last_reset": None,
"state": 1,
"sum": 3,
}
external_metadata = {
imported_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
@ -2255,8 +2700,8 @@ async def test_adjust_sum_statistics_errors(
{
"id": 1,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics1, external_statistics2],
"metadata": imported_metadata,
"stats": [imported_statistics1, imported_statistics2],
}
)
response = await client.receive_json()