Add websocket command recorder/import_statistics (#73937)

* Expose ws_add_external_statistics in websocket API

* Refactor

* Add tests

* Improve test coverage

Co-authored-by: Thibault Cohen <titilambert@users.noreply.github.com>
Co-authored-by: Erik <erik@montnemery.com>
This commit is contained in:
Thibault Cohen 2022-07-21 06:36:49 -04:00 committed by GitHub
parent 41f6383957
commit 1d7d2875e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 547 additions and 69 deletions

View File

@ -74,7 +74,7 @@ from .tasks import (
CommitTask, CommitTask,
DatabaseLockTask, DatabaseLockTask,
EventTask, EventTask,
ExternalStatisticsTask, ImportStatisticsTask,
KeepAliveTask, KeepAliveTask,
PerodicCleanupTask, PerodicCleanupTask,
PurgeTask, PurgeTask,
@ -480,11 +480,11 @@ class Recorder(threading.Thread):
) )
@callback @callback
def async_external_statistics( def async_import_statistics(
self, metadata: StatisticMetaData, stats: Iterable[StatisticData] self, metadata: StatisticMetaData, stats: Iterable[StatisticData]
) -> None: ) -> None:
"""Schedule external statistics.""" """Schedule import of statistics."""
self.queue_task(ExternalStatisticsTask(metadata, stats)) self.queue_task(ImportStatisticsTask(metadata, stats))
@callback @callback
def _async_setup_periodic_tasks(self) -> None: def _async_setup_periodic_tasks(self) -> None:

View File

@ -29,7 +29,7 @@ from homeassistant.const import (
VOLUME_CUBIC_FEET, VOLUME_CUBIC_FEET,
VOLUME_CUBIC_METERS, VOLUME_CUBIC_METERS,
) )
from homeassistant.core import Event, HomeAssistant, callback from homeassistant.core import Event, HomeAssistant, callback, valid_entity_id
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry from homeassistant.helpers import entity_registry
from homeassistant.helpers.json import JSONEncoder from homeassistant.helpers.json import JSONEncoder
@ -1360,6 +1360,54 @@ def _statistics_exists(
return result["id"] if result else None return result["id"] if result else None
@callback
def _async_import_statistics(
hass: HomeAssistant,
metadata: StatisticMetaData,
statistics: Iterable[StatisticData],
) -> None:
"""Validate timestamps and insert an import_statistics job in the recorder's queue."""
for statistic in statistics:
start = statistic["start"]
if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
raise HomeAssistantError("Naive timestamp")
if start.minute != 0 or start.second != 0 or start.microsecond != 0:
raise HomeAssistantError("Invalid timestamp")
statistic["start"] = dt_util.as_utc(start)
if "last_reset" in statistic and statistic["last_reset"] is not None:
last_reset = statistic["last_reset"]
if (
last_reset.tzinfo is None
or last_reset.tzinfo.utcoffset(last_reset) is None
):
raise HomeAssistantError("Naive timestamp")
statistic["last_reset"] = dt_util.as_utc(last_reset)
# Insert job in recorder's queue
hass.data[DATA_INSTANCE].async_import_statistics(metadata, statistics)
@callback
def async_import_statistics(
hass: HomeAssistant,
metadata: StatisticMetaData,
statistics: Iterable[StatisticData],
) -> None:
"""Import hourly statistics from an internal source.
This inserts an import_statistics job in the recorder's queue.
"""
if not valid_entity_id(metadata["statistic_id"]):
raise HomeAssistantError("Invalid statistic_id")
# The source must not be empty and must be aligned with the statistic_id
if not metadata["source"] or metadata["source"] != DOMAIN:
raise HomeAssistantError("Invalid source")
_async_import_statistics(hass, metadata, statistics)
@callback @callback
def async_add_external_statistics( def async_add_external_statistics(
hass: HomeAssistant, hass: HomeAssistant,
@ -1368,7 +1416,7 @@ def async_add_external_statistics(
) -> None: ) -> None:
"""Add hourly statistics from an external source. """Add hourly statistics from an external source.
This inserts an add_external_statistics job in the recorder's queue. This inserts an import_statistics job in the recorder's queue.
""" """
# The statistic_id has same limitations as an entity_id, but with a ':' as separator # The statistic_id has same limitations as an entity_id, but with a ':' as separator
if not valid_statistic_id(metadata["statistic_id"]): if not valid_statistic_id(metadata["statistic_id"]):
@ -1379,16 +1427,7 @@ def async_add_external_statistics(
if not metadata["source"] or metadata["source"] != domain: if not metadata["source"] or metadata["source"] != domain:
raise HomeAssistantError("Invalid source") raise HomeAssistantError("Invalid source")
for statistic in statistics: _async_import_statistics(hass, metadata, statistics)
start = statistic["start"]
if start.tzinfo is None or start.tzinfo.utcoffset(start) is None:
raise HomeAssistantError("Naive timestamp")
if start.minute != 0 or start.second != 0 or start.microsecond != 0:
raise HomeAssistantError("Invalid timestamp")
statistic["start"] = dt_util.as_utc(start)
# Insert job in recorder's queue
hass.data[DATA_INSTANCE].async_external_statistics(metadata, statistics)
def _filter_unique_constraint_integrity_error( def _filter_unique_constraint_integrity_error(
@ -1432,12 +1471,12 @@ def _filter_unique_constraint_integrity_error(
@retryable_database_job("statistics") @retryable_database_job("statistics")
def add_external_statistics( def import_statistics(
instance: Recorder, instance: Recorder,
metadata: StatisticMetaData, metadata: StatisticMetaData,
statistics: Iterable[StatisticData], statistics: Iterable[StatisticData],
) -> bool: ) -> bool:
"""Process an add_external_statistics job.""" """Process an import_statistics job."""
with session_scope( with session_scope(
session=instance.get_session(), session=instance.get_session(),

View File

@ -124,18 +124,18 @@ class StatisticsTask(RecorderTask):
@dataclass @dataclass
class ExternalStatisticsTask(RecorderTask): class ImportStatisticsTask(RecorderTask):
"""An object to insert into the recorder queue to run an external statistics task.""" """An object to insert into the recorder queue to run an import statistics task."""
metadata: StatisticMetaData metadata: StatisticMetaData
statistics: Iterable[StatisticData] statistics: Iterable[StatisticData]
def run(self, instance: Recorder) -> None: def run(self, instance: Recorder) -> None:
"""Run statistics task.""" """Run statistics task."""
if statistics.add_external_statistics(instance, self.metadata, self.statistics): if statistics.import_statistics(instance, self.metadata, self.statistics):
return return
# Schedule a new statistics task if this one didn't finish # Schedule a new statistics task if this one didn't finish
instance.queue_task(ExternalStatisticsTask(self.metadata, self.statistics)) instance.queue_task(ImportStatisticsTask(self.metadata, self.statistics))
@dataclass @dataclass

View File

@ -7,11 +7,17 @@ from typing import TYPE_CHECKING
import voluptuous as vol import voluptuous as vol
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback, valid_entity_id
from homeassistant.helpers import config_validation as cv
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG from .const import DATA_INSTANCE, MAX_QUEUE_BACKLOG
from .statistics import list_statistic_ids, validate_statistics from .statistics import (
async_add_external_statistics,
async_import_statistics,
list_statistic_ids,
validate_statistics,
)
from .util import async_migration_in_progress from .util import async_migration_in_progress
if TYPE_CHECKING: if TYPE_CHECKING:
@ -31,6 +37,7 @@ def async_setup(hass: HomeAssistant) -> None:
websocket_api.async_register_command(hass, ws_backup_start) websocket_api.async_register_command(hass, ws_backup_start)
websocket_api.async_register_command(hass, ws_backup_end) websocket_api.async_register_command(hass, ws_backup_end)
websocket_api.async_register_command(hass, ws_adjust_sum_statistics) websocket_api.async_register_command(hass, ws_adjust_sum_statistics)
websocket_api.async_register_command(hass, ws_import_statistics)
@websocket_api.websocket_command( @websocket_api.websocket_command(
@ -136,6 +143,46 @@ def ws_adjust_sum_statistics(
connection.send_result(msg["id"]) connection.send_result(msg["id"])
@websocket_api.require_admin
@websocket_api.websocket_command(
{
vol.Required("type"): "recorder/import_statistics",
vol.Required("metadata"): {
vol.Required("has_mean"): bool,
vol.Required("has_sum"): bool,
vol.Required("name"): vol.Any(str, None),
vol.Required("source"): str,
vol.Required("statistic_id"): str,
vol.Required("unit_of_measurement"): vol.Any(str, None),
},
vol.Required("stats"): [
{
vol.Required("start"): cv.datetime,
vol.Optional("mean"): vol.Any(float, int),
vol.Optional("min"): vol.Any(float, int),
vol.Optional("max"): vol.Any(float, int),
vol.Optional("last_reset"): vol.Any(cv.datetime, None),
vol.Optional("state"): vol.Any(float, int),
vol.Optional("sum"): vol.Any(float, int),
}
],
}
)
@callback
def ws_import_statistics(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Adjust sum statistics."""
metadata = msg["metadata"]
stats = msg["stats"]
if valid_entity_id(metadata["statistic_id"]):
async_import_statistics(hass, metadata, stats)
else:
async_add_external_statistics(hass, metadata, stats)
connection.send_result(msg["id"])
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): "recorder/info", vol.Required("type"): "recorder/info",

View File

@ -17,6 +17,7 @@ from homeassistant.components.recorder.db_schema import StatisticsShortTerm
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
from homeassistant.components.recorder.statistics import ( from homeassistant.components.recorder.statistics import (
async_add_external_statistics, async_add_external_statistics,
async_import_statistics,
delete_statistics_duplicates, delete_statistics_duplicates,
delete_statistics_meta_duplicates, delete_statistics_meta_duplicates,
get_last_short_term_statistics, get_last_short_term_statistics,
@ -437,26 +438,45 @@ def test_statistics_duplicated(hass_recorder, caplog):
caplog.clear() caplog.clear()
async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog): @pytest.mark.parametrize("last_reset_str", ("2022-01-01T00:00:00+02:00", None))
"""Test inserting external statistics.""" @pytest.mark.parametrize(
"source, statistic_id, import_fn",
(
("test", "test:total_energy_import", async_add_external_statistics),
("recorder", "sensor.total_energy_import", async_import_statistics),
),
)
async def test_import_statistics(
hass,
hass_ws_client,
recorder_mock,
caplog,
source,
statistic_id,
import_fn,
last_reset_str,
):
"""Test importing statistics and inserting external statistics."""
client = await hass_ws_client() client = await hass_ws_client()
assert "Compiling statistics for" not in caplog.text assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow() zero = dt_util.utcnow()
last_reset = dt_util.parse_datetime(last_reset_str) if last_reset_str else None
last_reset_utc_str = dt_util.as_utc(last_reset).isoformat() if last_reset else None
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2) period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = { external_statistics1 = {
"start": period1, "start": period1,
"last_reset": None, "last_reset": last_reset,
"state": 0, "state": 0,
"sum": 2, "sum": 2,
} }
external_statistics2 = { external_statistics2 = {
"start": period2, "start": period2,
"last_reset": None, "last_reset": last_reset,
"state": 1, "state": 1,
"sum": 3, "sum": 3,
} }
@ -465,37 +485,35 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
"has_mean": False, "has_mean": False,
"has_sum": True, "has_sum": True,
"name": "Total imported energy", "name": "Total imported energy",
"source": "test", "source": source,
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"unit_of_measurement": "kWh", "unit_of_measurement": "kWh",
} }
async_add_external_statistics( import_fn(hass, external_metadata, (external_statistics1, external_statistics2))
hass, external_metadata, (external_statistics1, external_statistics2)
)
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour") stats = statistics_during_period(hass, zero, period="hour")
assert stats == { assert stats == {
"test:total_energy_import": [ statistic_id: [
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period1.isoformat(), "start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(), "end": (period1 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(0.0), "state": approx(0.0),
"sum": approx(2.0), "sum": approx(2.0),
}, },
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period2.isoformat(), "start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(), "end": (period2 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(1.0), "state": approx(1.0),
"sum": approx(3.0), "sum": approx(3.0),
}, },
@ -506,37 +524,37 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
{ {
"has_mean": False, "has_mean": False,
"has_sum": True, "has_sum": True,
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"name": "Total imported energy", "name": "Total imported energy",
"source": "test", "source": source,
"unit_of_measurement": "kWh", "unit_of_measurement": "kWh",
} }
] ]
metadata = get_metadata(hass, statistic_ids=("test:total_energy_import",)) metadata = get_metadata(hass, statistic_ids=(statistic_id,))
assert metadata == { assert metadata == {
"test:total_energy_import": ( statistic_id: (
1, 1,
{ {
"has_mean": False, "has_mean": False,
"has_sum": True, "has_sum": True,
"name": "Total imported energy", "name": "Total imported energy",
"source": "test", "source": source,
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"unit_of_measurement": "kWh", "unit_of_measurement": "kWh",
}, },
) )
} }
last_stats = get_last_statistics(hass, 1, "test:total_energy_import", True) last_stats = get_last_statistics(hass, 1, statistic_id, True)
assert last_stats == { assert last_stats == {
"test:total_energy_import": [ statistic_id: [
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period2.isoformat(), "start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(), "end": (period2 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(1.0), "state": approx(1.0),
"sum": approx(3.0), "sum": approx(3.0),
}, },
@ -550,13 +568,13 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
"state": 5, "state": 5,
"sum": 6, "sum": 6,
} }
async_add_external_statistics(hass, external_metadata, (external_statistics,)) import_fn(hass, external_metadata, (external_statistics,))
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour") stats = statistics_during_period(hass, zero, period="hour")
assert stats == { assert stats == {
"test:total_energy_import": [ statistic_id: [
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period1.isoformat(), "start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(), "end": (period1 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
@ -567,13 +585,13 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
"sum": approx(6.0), "sum": approx(6.0),
}, },
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period2.isoformat(), "start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(), "end": (period2 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(1.0), "state": approx(1.0),
"sum": approx(3.0), "sum": approx(3.0),
}, },
@ -586,34 +604,34 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
"max": 1, "max": 1,
"mean": 2, "mean": 2,
"min": 3, "min": 3,
"last_reset": None, "last_reset": last_reset,
"state": 4, "state": 4,
"sum": 5, "sum": 5,
} }
async_add_external_statistics(hass, external_metadata, (external_statistics,)) import_fn(hass, external_metadata, (external_statistics,))
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour") stats = statistics_during_period(hass, zero, period="hour")
assert stats == { assert stats == {
"test:total_energy_import": [ statistic_id: [
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period1.isoformat(), "start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(), "end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0), "max": approx(1.0),
"mean": approx(2.0), "mean": approx(2.0),
"min": approx(3.0), "min": approx(3.0),
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(4.0), "state": approx(4.0),
"sum": approx(5.0), "sum": approx(5.0),
}, },
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period2.isoformat(), "start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(), "end": (period2 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(1.0), "state": approx(1.0),
"sum": approx(3.0), "sum": approx(3.0),
}, },
@ -624,7 +642,7 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
{ {
"id": 1, "id": 1,
"type": "recorder/adjust_sum_statistics", "type": "recorder/adjust_sum_statistics",
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start_time": period2.isoformat(), "start_time": period2.isoformat(),
"adjustment": 1000.0, "adjustment": 1000.0,
} }
@ -635,26 +653,26 @@ async def test_external_statistics(hass, hass_ws_client, recorder_mock, caplog):
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour") stats = statistics_during_period(hass, zero, period="hour")
assert stats == { assert stats == {
"test:total_energy_import": [ statistic_id: [
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period1.isoformat(), "start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(), "end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0), "max": approx(1.0),
"mean": approx(2.0), "mean": approx(2.0),
"min": approx(3.0), "min": approx(3.0),
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(4.0), "state": approx(4.0),
"sum": approx(5.0), "sum": approx(5.0),
}, },
{ {
"statistic_id": "test:total_energy_import", "statistic_id": statistic_id,
"start": period2.isoformat(), "start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(), "end": (period2 + timedelta(hours=1)).isoformat(),
"max": None, "max": None,
"mean": None, "mean": None,
"min": None, "min": None,
"last_reset": None, "last_reset": last_reset_utc_str,
"state": approx(1.0), "state": approx(1.0),
"sum": approx(1003.0), "sum": approx(1003.0),
}, },
@ -670,11 +688,12 @@ def test_external_statistics_errors(hass_recorder, caplog):
assert "Statistics already compiled" not in caplog.text assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow() zero = dt_util.utcnow()
last_reset = zero.replace(minute=0, second=0, microsecond=0) - timedelta(days=1)
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
_external_statistics = { _external_statistics = {
"start": period1, "start": period1,
"last_reset": None, "last_reset": last_reset,
"state": 0, "state": 0,
"sum": 2, "sum": 2,
} }
@ -711,7 +730,7 @@ def test_external_statistics_errors(hass_recorder, caplog):
assert list_statistic_ids(hass) == [] assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {} assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
# Attempt to insert statistics for an naive starting time # Attempt to insert statistics for a naive starting time
external_metadata = {**_external_metadata} external_metadata = {**_external_metadata}
external_statistics = { external_statistics = {
**_external_statistics, **_external_statistics,
@ -734,6 +753,106 @@ def test_external_statistics_errors(hass_recorder, caplog):
assert list_statistic_ids(hass) == [] assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {} assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
# Attempt to insert statistics with a naive last_reset
external_metadata = {**_external_metadata}
external_statistics = {
**_external_statistics,
"last_reset": last_reset.replace(tzinfo=None),
}
with pytest.raises(HomeAssistantError):
async_add_external_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
def test_import_statistics_errors(hass_recorder, caplog):
"""Test validation of imported statistics."""
hass = hass_recorder()
wait_recording_done(hass)
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow()
last_reset = zero.replace(minute=0, second=0, microsecond=0) - timedelta(days=1)
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
_external_statistics = {
"start": period1,
"last_reset": last_reset,
"state": 0,
"sum": 2,
}
_external_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": "recorder",
"statistic_id": "sensor.total_energy_import",
"unit_of_measurement": "kWh",
}
# Attempt to insert statistics for an external source
external_metadata = {
**_external_metadata,
"statistic_id": "test:total_energy_import",
}
external_statistics = {**_external_statistics}
with pytest.raises(HomeAssistantError):
async_import_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("test:total_energy_import",)) == {}
# Attempt to insert statistics for the wrong domain
external_metadata = {**_external_metadata, "source": "sensor"}
external_statistics = {**_external_statistics}
with pytest.raises(HomeAssistantError):
async_import_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
# Attempt to insert statistics for a naive starting time
external_metadata = {**_external_metadata}
external_statistics = {
**_external_statistics,
"start": period1.replace(tzinfo=None),
}
with pytest.raises(HomeAssistantError):
async_import_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
# Attempt to insert statistics for an invalid starting time
external_metadata = {**_external_metadata}
external_statistics = {**_external_statistics, "start": period1.replace(minute=1)}
with pytest.raises(HomeAssistantError):
async_import_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
# Attempt to insert statistics with a naive last_reset
external_metadata = {**_external_metadata}
external_statistics = {
**_external_statistics,
"last_reset": last_reset.replace(tzinfo=None),
}
with pytest.raises(HomeAssistantError):
async_import_statistics(hass, external_metadata, (external_statistics,))
wait_recording_done(hass)
assert statistics_during_period(hass, zero, period="hour") == {}
assert list_statistic_ids(hass) == []
assert get_metadata(hass, statistic_ids=("sensor.total_energy_import",)) == {}
@pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"]) @pytest.mark.parametrize("timezone", ["America/Regina", "Europe/Vienna", "UTC"])
@pytest.mark.freeze_time("2021-08-01 00:00:00+00:00") @pytest.mark.freeze_time("2021-08-01 00:00:00+00:00")

View File

@ -9,7 +9,13 @@ from pytest import approx
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.statistics import async_add_external_statistics from homeassistant.components.recorder.statistics import (
async_add_external_statistics,
get_last_statistics,
get_metadata,
list_statistic_ids,
statistics_during_period,
)
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import METRIC_SYSTEM from homeassistant.util.unit_system import METRIC_SYSTEM
@ -547,3 +553,270 @@ async def test_get_statistics_metadata(
"unit_of_measurement": unit, "unit_of_measurement": unit,
} }
] ]
@pytest.mark.parametrize(
"source, statistic_id",
(
("test", "test:total_energy_import"),
("recorder", "sensor.total_energy_import"),
),
)
async def test_import_statistics(
hass, hass_ws_client, recorder_mock, caplog, source, statistic_id
):
"""Test importing statistics."""
client = await hass_ws_client()
assert "Compiling statistics for" not in caplog.text
assert "Statistics already compiled" not in caplog.text
zero = dt_util.utcnow()
period1 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
period2 = zero.replace(minute=0, second=0, microsecond=0) + timedelta(hours=2)
external_statistics1 = {
"start": period1.isoformat(),
"last_reset": None,
"state": 0,
"sum": 2,
}
external_statistics2 = {
"start": period2.isoformat(),
"last_reset": None,
"state": 1,
"sum": 3,
}
external_metadata = {
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": source,
"statistic_id": statistic_id,
"unit_of_measurement": "kWh",
}
await client.send_json(
{
"id": 1,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics1, external_statistics2],
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] is None
await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
statistic_id: [
{
"statistic_id": statistic_id,
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(0.0),
"sum": approx(2.0),
},
{
"statistic_id": statistic_id,
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(3.0),
},
]
}
statistic_ids = list_statistic_ids(hass) # TODO
assert statistic_ids == [
{
"has_mean": False,
"has_sum": True,
"statistic_id": statistic_id,
"name": "Total imported energy",
"source": source,
"unit_of_measurement": "kWh",
}
]
metadata = get_metadata(hass, statistic_ids=(statistic_id,))
assert metadata == {
statistic_id: (
1,
{
"has_mean": False,
"has_sum": True,
"name": "Total imported energy",
"source": source,
"statistic_id": statistic_id,
"unit_of_measurement": "kWh",
},
)
}
last_stats = get_last_statistics(hass, 1, statistic_id, True)
assert last_stats == {
statistic_id: [
{
"statistic_id": statistic_id,
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(3.0),
},
]
}
# Update the previously inserted statistics
external_statistics = {
"start": period1.isoformat(),
"last_reset": None,
"state": 5,
"sum": 6,
}
await client.send_json(
{
"id": 2,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics],
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] is None
await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
statistic_id: [
{
"statistic_id": statistic_id,
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(5.0),
"sum": approx(6.0),
},
{
"statistic_id": statistic_id,
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(3.0),
},
]
}
# Update the previously inserted statistics
external_statistics = {
"start": period1.isoformat(),
"max": 1,
"mean": 2,
"min": 3,
"last_reset": None,
"state": 4,
"sum": 5,
}
await client.send_json(
{
"id": 3,
"type": "recorder/import_statistics",
"metadata": external_metadata,
"stats": [external_statistics],
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] is None
await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
statistic_id: [
{
"statistic_id": statistic_id,
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0),
"mean": approx(2.0),
"min": approx(3.0),
"last_reset": None,
"state": approx(4.0),
"sum": approx(5.0),
},
{
"statistic_id": statistic_id,
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(3.0),
},
]
}
await client.send_json(
{
"id": 4,
"type": "recorder/adjust_sum_statistics",
"statistic_id": statistic_id,
"start_time": period2.isoformat(),
"adjustment": 1000.0,
}
)
response = await client.receive_json()
assert response["success"]
await async_wait_recording_done(hass)
stats = statistics_during_period(hass, zero, period="hour")
assert stats == {
statistic_id: [
{
"statistic_id": statistic_id,
"start": period1.isoformat(),
"end": (period1 + timedelta(hours=1)).isoformat(),
"max": approx(1.0),
"mean": approx(2.0),
"min": approx(3.0),
"last_reset": None,
"state": approx(4.0),
"sum": approx(5.0),
},
{
"statistic_id": statistic_id,
"start": period2.isoformat(),
"end": (period2 + timedelta(hours=1)).isoformat(),
"max": None,
"mean": None,
"min": None,
"last_reset": None,
"state": approx(1.0),
"sum": approx(1003.0),
},
]
}