Improve sensor statistics tests (#52386)

This commit is contained in:
Erik Montnemery 2021-07-02 09:51:47 +02:00 committed by GitHub
parent 887753e06d
commit 16d2dcbfb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 403 additions and 157 deletions

View File

@ -45,6 +45,8 @@ QUERY_STATISTIC_META = [
STATISTICS_BAKERY = "recorder_statistics_bakery"
STATISTICS_META_BAKERY = "recorder_statistics_bakery"
# Convert pressure and temperature statistics from the native unit used for statistics
# to the units configured by the user
UNIT_CONVERSIONS = {
PRESSURE_PA: lambda x, units: pressure_util.convert(
x, PRESSURE_PA, units.pressure_unit
@ -137,7 +139,8 @@ def _get_meta_data(hass, session, statistic_ids):
return {id: _meta(result, id) for id in statistic_ids}
def _unit_system_unit(unit: str, units) -> str:
def _configured_unit(unit: str, units) -> str:
"""Return the pressure and temperature units configured by the user."""
if unit == PRESSURE_PA:
return units.pressure_unit
if unit == TEMP_CELSIUS:
@ -146,7 +149,7 @@ def _unit_system_unit(unit: str, units) -> str:
def list_statistic_ids(hass, statistic_type=None):
"""Return statistic_ids."""
"""Return statistic_ids and meta data."""
units = hass.config.units
with session_scope(hass=hass) as session:
baked_query = hass.data[STATISTICS_BAKERY](
@ -161,13 +164,14 @@ def list_statistic_ids(hass, statistic_type=None):
baked_query += lambda q: q.order_by(Statistics.statistic_id)
result = execute(baked_query(session))
statistic_ids_list = [statistic_id[0] for statistic_id in result]
statistic_ids = _get_meta_data(hass, session, statistic_ids_list)
for statistic_id in statistic_ids.values():
unit = _unit_system_unit(statistic_id["unit_of_measurement"], units)
statistic_id["unit_of_measurement"] = unit
return list(statistic_ids.values())
statistic_ids = [statistic_id[0] for statistic_id in result]
meta_data = _get_meta_data(hass, session, statistic_ids)
for item in meta_data.values():
unit = _configured_unit(item["unit_of_measurement"], units)
item["unit_of_measurement"] = unit
return list(meta_data.values())
def statistics_during_period(hass, start_time, end_time=None, statistic_ids=None):

View File

@ -54,6 +54,7 @@ DEVICE_CLASS_STATISTICS = {
DEVICE_CLASS_TEMPERATURE: {"mean", "min", "max"},
}
# Normalized units which will be stored in the statistics table
DEVICE_CLASS_UNITS = {
DEVICE_CLASS_ENERGY: ENERGY_KILO_WATT_HOUR,
DEVICE_CLASS_POWER: POWER_WATT,
@ -62,14 +63,18 @@ DEVICE_CLASS_UNITS = {
}
UNIT_CONVERSIONS: dict[str, dict[str, Callable]] = {
# Convert energy to kWh
DEVICE_CLASS_ENERGY: {
ENERGY_KILO_WATT_HOUR: lambda x: x,
ENERGY_WATT_HOUR: lambda x: x / 1000,
},
# Convert power W
DEVICE_CLASS_POWER: {
POWER_WATT: lambda x: x,
POWER_KILO_WATT: lambda x: x * 1000,
},
# Convert pressure to Pa
# Note: pressure_util.convert is bypassed to avoid redundant error checking
DEVICE_CLASS_PRESSURE: {
PRESSURE_BAR: lambda x: x / pressure_util.UNIT_CONVERSION[PRESSURE_BAR],
PRESSURE_HPA: lambda x: x / pressure_util.UNIT_CONVERSION[PRESSURE_HPA],
@ -78,6 +83,8 @@ UNIT_CONVERSIONS: dict[str, dict[str, Callable]] = {
PRESSURE_PA: lambda x: x / pressure_util.UNIT_CONVERSION[PRESSURE_PA],
PRESSURE_PSI: lambda x: x / pressure_util.UNIT_CONVERSION[PRESSURE_PSI],
},
# Convert temperature to °C
# Note: temperature_util.convert is bypassed to avoid redundant error checking
DEVICE_CLASS_TEMPERATURE: {
TEMP_CELSIUS: lambda x: x,
TEMP_FAHRENHEIT: temperature_util.fahrenheit_to_celsius,
@ -85,6 +92,7 @@ UNIT_CONVERSIONS: dict[str, dict[str, Callable]] = {
},
}
# Keep track of entities for which a warning about unsupported unit has been logged
WARN_UNSUPPORTED_UNIT = set()

View File

@ -1,32 +1,141 @@
"""The tests for sensor recorder platform."""
# pylint: disable=protected-access,invalid-name
from datetime import timedelta
from unittest.mock import patch, sentinel
from unittest.mock import patch
import pytest
from pytest import approx
from homeassistant.components.recorder import history
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.components.recorder.models import process_timestamp_to_utc_isoformat
from homeassistant.components.recorder.statistics import statistics_during_period
from homeassistant.const import STATE_UNAVAILABLE, TEMP_CELSIUS
from homeassistant.components.recorder.statistics import (
list_statistic_ids,
statistics_during_period,
)
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.setup import setup_component
import homeassistant.util.dt as dt_util
from tests.components.recorder.common import wait_recording_done
ENERGY_SENSOR_ATTRIBUTES = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
}
POWER_SENSOR_ATTRIBUTES = {
"device_class": "power",
"state_class": "measurement",
"unit_of_measurement": "kW",
}
PRESSURE_SENSOR_ATTRIBUTES = {
"device_class": "pressure",
"state_class": "measurement",
"unit_of_measurement": "hPa",
}
TEMPERATURE_SENSOR_ATTRIBUTES = {
"device_class": "temperature",
"state_class": "measurement",
"unit_of_measurement": "°C",
}
def test_compile_hourly_statistics(hass_recorder):
@pytest.mark.parametrize(
"device_class,unit,native_unit,mean,min,max",
[
("battery", "%", "%", 16.440677, 10, 30),
("battery", None, None, 16.440677, 10, 30),
("humidity", "%", "%", 16.440677, 10, 30),
("humidity", None, None, 16.440677, 10, 30),
("pressure", "Pa", "Pa", 16.440677, 10, 30),
("pressure", "hPa", "Pa", 1644.0677, 1000, 3000),
("pressure", "mbar", "Pa", 1644.0677, 1000, 3000),
("pressure", "inHg", "Pa", 55674.53, 33863.89, 101591.67),
("pressure", "psi", "Pa", 113354.48, 68947.57, 206842.71),
("temperature", "°C", "°C", 16.440677, 10, 30),
("temperature", "°F", "°C", -8.644068, -12.22222, -1.111111),
],
)
def test_compile_hourly_statistics(
hass_recorder, caplog, device_class, unit, native_unit, mean, min, max
):
"""Test compiling hourly statistics."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
zero, four, states = record_states(hass)
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": unit,
}
four, states = record_states(hass, zero, "sensor.test1", attributes)
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
]
stats = statistics_during_period(hass, zero)
assert stats == {
"sensor.test1": [
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero),
"mean": approx(mean),
"min": approx(min),
"max": approx(max),
"last_reset": None,
"state": None,
"sum": None,
}
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
@pytest.mark.parametrize("attributes", [TEMPERATURE_SENSOR_ATTRIBUTES])
def test_compile_hourly_statistics_unsupported(hass_recorder, caplog, attributes):
"""Test compiling hourly statistics for unsupported sensor."""
attributes = dict(attributes)
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
four, states = record_states(hass, zero, "sensor.test1", attributes)
if "unit_of_measurement" in attributes:
attributes["unit_of_measurement"] = "invalid"
_, _states = record_states(hass, zero, "sensor.test2", attributes)
states = {**states, **_states}
attributes.pop("unit_of_measurement")
_, _states = record_states(hass, zero, "sensor.test3", attributes)
states = {**states, **_states}
attributes["state_class"] = "invalid"
_, _states = record_states(hass, zero, "sensor.test4", attributes)
states = {**states, **_states}
attributes.pop("state_class")
_, _states = record_states(hass, zero, "sensor.test5", attributes)
states = {**states, **_states}
attributes["state_class"] = "measurement"
_, _states = record_states(hass, zero, "sensor.test6", attributes)
states = {**states, **_states}
attributes["state_class"] = "unsupported"
_, _states = record_states(hass, zero, "sensor.test7", attributes)
states = {**states, **_states}
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "°C"}
]
stats = statistics_during_period(hass, zero)
assert stats == {
"sensor.test1": [
@ -42,23 +151,36 @@ def test_compile_hourly_statistics(hass_recorder):
}
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_energy_statistics(hass_recorder):
@pytest.mark.parametrize(
"device_class,unit,native_unit,factor",
[
("energy", "kWh", "kWh", 1),
("energy", "Wh", "kWh", 1 / 1000),
("monetary", "", "", 1),
("monetary", "SEK", "SEK", 1),
],
)
def test_compile_hourly_energy_statistics(
hass_recorder, caplog, device_class, unit, native_unit, factor
):
"""Test compiling hourly statistics."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
sns1_attr = {
"device_class": "energy",
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": "kWh",
"unit_of_measurement": unit,
"last_reset": None,
}
sns2_attr = {"device_class": "energy"}
sns3_attr = {}
seq = [10, 15, 20, 10, 30, 40, 50, 60, 70]
zero, four, eight, states = record_energy_states(
hass, sns1_attr, sns2_attr, sns3_attr
four, eight, states = record_energy_states(
hass, zero, "sensor.test1", attributes, seq
)
hist = history.get_significant_states(
hass, zero - timedelta.resolution, eight + timedelta.resolution
@ -71,6 +193,97 @@ def test_compile_hourly_energy_statistics(hass_recorder):
wait_recording_done(hass)
recorder.do_adhoc_statistics(period="hourly", start=zero + timedelta(hours=2))
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": native_unit}
]
stats = statistics_during_period(hass, zero)
assert stats == {
"sensor.test1": [
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero),
"max": None,
"mean": None,
"min": None,
"last_reset": process_timestamp_to_utc_isoformat(zero),
"state": approx(factor * seq[2]),
"sum": approx(factor * 10.0),
},
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero + timedelta(hours=1)),
"max": None,
"mean": None,
"min": None,
"last_reset": process_timestamp_to_utc_isoformat(four),
"state": approx(factor * seq[5]),
"sum": approx(factor * 10.0),
},
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(zero + timedelta(hours=2)),
"max": None,
"mean": None,
"min": None,
"last_reset": process_timestamp_to_utc_isoformat(four),
"state": approx(factor * seq[8]),
"sum": approx(factor * 40.0),
},
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_energy_statistics_unsupported(hass_recorder, caplog):
"""Test compiling hourly statistics."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
sns1_attr = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
"last_reset": None,
}
sns2_attr = {"device_class": "energy"}
sns3_attr = {}
sns4_attr = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
}
seq1 = [10, 15, 20, 10, 30, 40, 50, 60, 70]
seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75]
seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
seq4 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
four, eight, states = record_energy_states(
hass, zero, "sensor.test1", sns1_attr, seq1
)
_, _, _states = record_energy_states(hass, zero, "sensor.test2", sns2_attr, seq2)
states = {**states, **_states}
_, _, _states = record_energy_states(hass, zero, "sensor.test3", sns3_attr, seq3)
states = {**states, **_states}
_, _, _states = record_energy_states(hass, zero, "sensor.test4", sns4_attr, seq4)
states = {**states, **_states}
hist = history.get_significant_states(
hass, zero - timedelta.resolution, eight + timedelta.resolution
)
assert dict(states)["sensor.test1"] == dict(hist)["sensor.test1"]
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
recorder.do_adhoc_statistics(period="hourly", start=zero + timedelta(hours=1))
wait_recording_done(hass)
recorder.do_adhoc_statistics(period="hourly", start=zero + timedelta(hours=2))
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "kWh"}
]
stats = statistics_during_period(hass, zero)
assert stats == {
"sensor.test1": [
@ -106,32 +319,37 @@ def test_compile_hourly_energy_statistics(hass_recorder):
},
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_energy_statistics2(hass_recorder):
"""Test compiling hourly statistics."""
def test_compile_hourly_energy_statistics_multiple(hass_recorder, caplog):
"""Test compiling multiple hourly statistics."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
sns1_attr = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
}
sns2_attr = {
"device_class": "energy",
"state_class": "measurement",
"unit_of_measurement": "kWh",
}
sns1_attr = {**ENERGY_SENSOR_ATTRIBUTES, "last_reset": None}
sns2_attr = {**ENERGY_SENSOR_ATTRIBUTES, "last_reset": None}
sns3_attr = {
"device_class": "energy",
"state_class": "measurement",
**ENERGY_SENSOR_ATTRIBUTES,
"unit_of_measurement": "Wh",
"last_reset": None,
}
sns4_attr = {**ENERGY_SENSOR_ATTRIBUTES}
seq1 = [10, 15, 20, 10, 30, 40, 50, 60, 70]
seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75]
seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
seq4 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
zero, four, eight, states = record_energy_states(
hass, sns1_attr, sns2_attr, sns3_attr
four, eight, states = record_energy_states(
hass, zero, "sensor.test1", sns1_attr, seq1
)
_, _, _states = record_energy_states(hass, zero, "sensor.test2", sns2_attr, seq2)
states = {**states, **_states}
_, _, _states = record_energy_states(hass, zero, "sensor.test3", sns3_attr, seq3)
states = {**states, **_states}
_, _, _states = record_energy_states(hass, zero, "sensor.test4", sns4_attr, seq4)
states = {**states, **_states}
hist = history.get_significant_states(
hass, zero - timedelta.resolution, eight + timedelta.resolution
)
@ -143,6 +361,12 @@ def test_compile_hourly_energy_statistics2(hass_recorder):
wait_recording_done(hass)
recorder.do_adhoc_statistics(period="hourly", start=zero + timedelta(hours=2))
wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass)
assert statistic_ids == [
{"statistic_id": "sensor.test1", "unit_of_measurement": "kWh"},
{"statistic_id": "sensor.test2", "unit_of_measurement": "kWh"},
{"statistic_id": "sensor.test3", "unit_of_measurement": "kWh"},
]
stats = statistics_during_period(hass, zero)
assert stats == {
"sensor.test1": [
@ -242,14 +466,39 @@ def test_compile_hourly_energy_statistics2(hass_recorder):
},
],
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_statistics_unchanged(hass_recorder):
@pytest.mark.parametrize(
"device_class,unit,value",
[
("battery", "%", 30),
("battery", None, 30),
("humidity", "%", 30),
("humidity", None, 30),
("pressure", "Pa", 30),
("pressure", "hPa", 3000),
("pressure", "mbar", 3000),
("pressure", "inHg", 101591.67),
("pressure", "psi", 206842.71),
("temperature", "°C", 30),
("temperature", "°F", -1.111111),
],
)
def test_compile_hourly_statistics_unchanged(
hass_recorder, caplog, device_class, unit, value
):
"""Test compiling hourly statistics, with no changes during the hour."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
zero, four, states = record_states(hass)
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": unit,
}
four, states = record_states(hass, zero, "sensor.test1", attributes)
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
@ -261,23 +510,27 @@ def test_compile_hourly_statistics_unchanged(hass_recorder):
{
"statistic_id": "sensor.test1",
"start": process_timestamp_to_utc_isoformat(four),
"mean": approx(30.0),
"min": approx(30.0),
"max": approx(30.0),
"mean": approx(value),
"min": approx(value),
"max": approx(value),
"last_reset": None,
"state": None,
"sum": None,
}
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_statistics_partially_unavailable(hass_recorder):
def test_compile_hourly_statistics_partially_unavailable(hass_recorder, caplog):
"""Test compiling hourly statistics, with the sensor being partially unavailable."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
zero, four, states = record_states_partially_unavailable(hass)
four, states = record_states_partially_unavailable(
hass, zero, "sensor.test1", TEMPERATURE_SENSOR_ATTRIBUTES
)
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
@ -298,39 +551,87 @@ def test_compile_hourly_statistics_partially_unavailable(hass_recorder):
}
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def test_compile_hourly_statistics_unavailable(hass_recorder):
@pytest.mark.parametrize(
"device_class,unit,value",
[
("battery", "%", 30),
("battery", None, 30),
("humidity", "%", 30),
("humidity", None, 30),
("pressure", "Pa", 30),
("pressure", "hPa", 3000),
("pressure", "mbar", 3000),
("pressure", "inHg", 101591.67),
("pressure", "psi", 206842.71),
("temperature", "°C", 30),
("temperature", "°F", -1.111111),
],
)
def test_compile_hourly_statistics_unavailable(
hass_recorder, caplog, device_class, unit, value
):
"""Test compiling hourly statistics, with the sensor being unavailable."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
zero, four, states = record_states_partially_unavailable(hass)
attributes = {
"device_class": device_class,
"state_class": "measurement",
"unit_of_measurement": unit,
}
four, states = record_states_partially_unavailable(
hass, zero, "sensor.test1", attributes
)
_, _states = record_states(hass, zero, "sensor.test2", attributes)
states = {**states, **_states}
hist = history.get_significant_states(hass, zero, four)
assert dict(states) == dict(hist)
recorder.do_adhoc_statistics(period="hourly", start=four)
wait_recording_done(hass)
stats = statistics_during_period(hass, four)
assert stats == {}
assert stats == {
"sensor.test2": [
{
"statistic_id": "sensor.test2",
"start": process_timestamp_to_utc_isoformat(four),
"mean": approx(value),
"min": approx(value),
"max": approx(value),
"last_reset": None,
"state": None,
"sum": None,
}
]
}
assert "Error while processing event StatisticsTask" not in caplog.text
def record_states(hass):
def test_compile_hourly_statistics_fails(hass_recorder, caplog):
"""Test compiling hourly statistics throws."""
zero = dt_util.utcnow()
hass = hass_recorder()
recorder = hass.data[DATA_INSTANCE]
setup_component(hass, "sensor", {})
with patch(
"homeassistant.components.sensor.recorder.compile_statistics",
side_effect=Exception,
):
recorder.do_adhoc_statistics(period="hourly", start=zero)
wait_recording_done(hass)
assert "Error while processing event StatisticsTask" in caplog.text
def record_states(hass, zero, entity_id, attributes):
"""Record some test states.
We inject a bunch of state updates for temperature sensors.
"""
mp = "media_player.test"
sns1 = "sensor.test1"
sns2 = "sensor.test2"
sns3 = "sensor.test3"
sns1_attr = {
"device_class": "temperature",
"state_class": "measurement",
"unit_of_measurement": TEMP_CELSIUS,
}
sns2_attr = {"device_class": "temperature"}
sns3_attr = {}
attributes = dict(attributes)
def set_state(entity_id, state, **kwargs):
"""Set the state."""
@ -338,46 +639,29 @@ def record_states(hass):
wait_recording_done(hass)
return hass.states.get(entity_id)
zero = dt_util.utcnow()
one = zero + timedelta(minutes=1)
two = one + timedelta(minutes=10)
three = two + timedelta(minutes=40)
four = three + timedelta(minutes=10)
states = {mp: [], sns1: [], sns2: [], sns3: []}
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[mp].append(
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "10", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
states[entity_id].append(set_state(entity_id, "10", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
states[sns1].append(set_state(sns1, "15", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "15", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "15", attributes=sns3_attr))
states[entity_id].append(set_state(entity_id, "15", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
states[sns1].append(set_state(sns1, "30", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "30", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "30", attributes=sns3_attr))
states[entity_id].append(set_state(entity_id, "30", attributes=attributes))
return zero, four, states
return four, states
def record_energy_states(hass, _sns1_attr, _sns2_attr, _sns3_attr):
def record_energy_states(hass, zero, entity_id, _attributes, seq):
"""Record some test states.
We inject a bunch of state updates for energy sensors.
"""
sns1 = "sensor.test1"
sns2 = "sensor.test2"
sns3 = "sensor.test3"
sns4 = "sensor.test4"
def set_state(entity_id, state, **kwargs):
"""Set the state."""
@ -385,7 +669,6 @@ def record_energy_states(hass, _sns1_attr, _sns2_attr, _sns3_attr):
wait_recording_done(hass)
return hass.states.get(entity_id)
zero = dt_util.utcnow()
one = zero + timedelta(minutes=15)
two = one + timedelta(minutes=30)
three = two + timedelta(minutes=15)
@ -395,88 +678,50 @@ def record_energy_states(hass, _sns1_attr, _sns2_attr, _sns3_attr):
seven = six + timedelta(minutes=15)
eight = seven + timedelta(minutes=30)
sns1_attr = {**_sns1_attr, "last_reset": zero.isoformat()}
sns2_attr = {**_sns2_attr, "last_reset": zero.isoformat()}
sns3_attr = {**_sns3_attr, "last_reset": zero.isoformat()}
sns4_attr = {**_sns3_attr}
attributes = dict(_attributes)
if "last_reset" in _attributes:
attributes["last_reset"] = zero.isoformat()
states = {sns1: [], sns2: [], sns3: [], sns4: []}
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=zero):
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr)) # Sum 0
states[sns2].append(set_state(sns2, "110", attributes=sns2_attr)) # Sum 0
states[sns3].append(set_state(sns3, "0", attributes=sns3_attr)) # Sum 0
states[sns4].append(set_state(sns4, "0", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
states[sns1].append(set_state(sns1, "15", attributes=sns1_attr)) # Sum 5
states[sns2].append(set_state(sns2, "120", attributes=sns2_attr)) # Sum 10
states[sns3].append(set_state(sns3, "0", attributes=sns3_attr)) # Sum 0
states[sns4].append(set_state(sns4, "0", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
states[sns1].append(set_state(sns1, "20", attributes=sns1_attr)) # Sum 10
states[sns2].append(set_state(sns2, "130", attributes=sns2_attr)) # Sum 20
states[sns3].append(set_state(sns3, "5", attributes=sns3_attr)) # Sum 5
states[sns4].append(set_state(sns4, "5", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr)) # Sum 0
states[sns2].append(set_state(sns2, "0", attributes=sns2_attr)) # Sum -110
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr)) # Sum 10
states[sns4].append(set_state(sns4, "10", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes))
sns1_attr = {**_sns1_attr, "last_reset": four.isoformat()}
sns2_attr = {**_sns2_attr, "last_reset": four.isoformat()}
sns3_attr = {**_sns3_attr, "last_reset": four.isoformat()}
attributes = dict(_attributes)
if "last_reset" in _attributes:
attributes["last_reset"] = four.isoformat()
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=four):
states[sns1].append(set_state(sns1, "30", attributes=sns1_attr)) # Sum 0
states[sns2].append(set_state(sns2, "30", attributes=sns2_attr)) # Sum -110
states[sns3].append(set_state(sns3, "30", attributes=sns3_attr)) # Sum 10
states[sns4].append(set_state(sns4, "30", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=five):
states[sns1].append(set_state(sns1, "40", attributes=sns1_attr)) # Sum 10
states[sns2].append(set_state(sns2, "45", attributes=sns2_attr)) # Sum -95
states[sns3].append(set_state(sns3, "50", attributes=sns3_attr)) # Sum 30
states[sns4].append(set_state(sns4, "50", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=six):
states[sns1].append(set_state(sns1, "50", attributes=sns1_attr)) # Sum 20
states[sns2].append(set_state(sns2, "55", attributes=sns2_attr)) # Sum -85
states[sns3].append(set_state(sns3, "60", attributes=sns3_attr)) # Sum 40
states[sns4].append(set_state(sns4, "60", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=seven):
states[sns1].append(set_state(sns1, "60", attributes=sns1_attr)) # Sum 30
states[sns2].append(set_state(sns2, "65", attributes=sns2_attr)) # Sum -75
states[sns3].append(set_state(sns3, "80", attributes=sns3_attr)) # Sum 60
states[sns4].append(set_state(sns4, "80", attributes=sns4_attr)) # -
states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=eight):
states[sns1].append(set_state(sns1, "70", attributes=sns1_attr)) # Sum 40
states[sns2].append(set_state(sns2, "75", attributes=sns2_attr)) # Sum -65
states[sns3].append(set_state(sns3, "90", attributes=sns3_attr)) # Sum 70
states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes))
return zero, four, eight, states
return four, eight, states
def record_states_partially_unavailable(hass):
def record_states_partially_unavailable(hass, zero, entity_id, attributes):
"""Record some test states.
We inject a bunch of state updates temperature sensors.
"""
mp = "media_player.test"
sns1 = "sensor.test1"
sns2 = "sensor.test2"
sns3 = "sensor.test3"
sns1_attr = {
"device_class": "temperature",
"state_class": "measurement",
"unit_of_measurement": TEMP_CELSIUS,
}
sns2_attr = {"device_class": "temperature"}
sns3_attr = {}
def set_state(entity_id, state, **kwargs):
"""Set the state."""
@ -484,32 +729,21 @@ def record_states_partially_unavailable(hass):
wait_recording_done(hass)
return hass.states.get(entity_id)
zero = dt_util.utcnow()
one = zero + timedelta(minutes=1)
two = one + timedelta(minutes=15)
three = two + timedelta(minutes=30)
four = three + timedelta(minutes=15)
states = {mp: [], sns1: [], sns2: [], sns3: []}
states = {entity_id: []}
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=one):
states[mp].append(
set_state(mp, "idle", attributes={"media_title": str(sentinel.mt1)})
)
states[mp].append(
set_state(mp, "YouTube", attributes={"media_title": str(sentinel.mt2)})
)
states[sns1].append(set_state(sns1, "10", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "10", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "10", attributes=sns3_attr))
states[entity_id].append(set_state(entity_id, "10", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=two):
states[sns1].append(set_state(sns1, "25", attributes=sns1_attr))
states[sns2].append(set_state(sns2, "25", attributes=sns2_attr))
states[sns3].append(set_state(sns3, "25", attributes=sns3_attr))
states[entity_id].append(set_state(entity_id, "25", attributes=attributes))
with patch("homeassistant.components.recorder.dt_util.utcnow", return_value=three):
states[sns1].append(set_state(sns1, STATE_UNAVAILABLE, attributes=sns1_attr))
states[sns2].append(set_state(sns2, STATE_UNAVAILABLE, attributes=sns2_attr))
states[sns3].append(set_state(sns3, STATE_UNAVAILABLE, attributes=sns3_attr))
states[entity_id].append(
set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes)
)
return zero, four, states
return four, states