Update sensor tests to avoid patching utcnow (#93497)

This commit is contained in:
J. Nick Koston 2023-05-24 19:54:08 -05:00 committed by GitHub
parent d5e09bd4c3
commit 7ec6e03d5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -7,6 +7,8 @@ import math
from statistics import mean from statistics import mean
from unittest.mock import patch from unittest.mock import patch
from freezegun import freeze_time
from freezegun.api import FrozenDateTimeFactory
import pytest import pytest
from homeassistant import loader from homeassistant import loader
@ -155,7 +157,8 @@ def test_compile_hourly_statistics(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -251,17 +254,13 @@ def test_compile_hourly_statistics_with_some_same_last_updated(
four = three + timedelta(seconds=10 * 5) four = three + timedelta(seconds=10 * 5)
states = {entity_id: []} states = {entity_id: []}
with patch( with freeze_time(one) as freezer:
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, str(seq[0]), attributes=attributes) set_state(entity_id, str(seq[0]), attributes=attributes)
) )
# Record two states at the exact same time # Record two states at the exact same time
with patch( freezer.move_to(two)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, str(seq[1]), attributes=attributes) set_state(entity_id, str(seq[1]), attributes=attributes)
) )
@ -269,9 +268,7 @@ def test_compile_hourly_statistics_with_some_same_last_updated(
set_state(entity_id, str(seq[2]), attributes=attributes) set_state(entity_id, str(seq[2]), attributes=attributes)
) )
with patch( freezer.move_to(three)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, str(seq[3]), attributes=attributes) set_state(entity_id, str(seq[3]), attributes=attributes)
) )
@ -371,9 +368,7 @@ def test_compile_hourly_statistics_with_all_same_last_updated(
four = three + timedelta(seconds=10 * 5) four = three + timedelta(seconds=10 * 5)
states = {entity_id: []} states = {entity_id: []}
with patch( with freeze_time(two):
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, str(seq[0]), attributes=attributes) set_state(entity_id, str(seq[0]), attributes=attributes)
) )
@ -480,9 +475,7 @@ def test_compile_hourly_statistics_only_state_is_and_end_of_period(
end = zero + timedelta(minutes=5) end = zero + timedelta(minutes=5)
states = {entity_id: []} states = {entity_id: []}
with patch( with freeze_time(end):
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=end
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, str(seq[0]), attributes=attributes) set_state(entity_id, str(seq[0]), attributes=attributes)
) )
@ -559,7 +552,8 @@ def test_compile_hourly_statistics_purged_state_changes(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -568,9 +562,7 @@ def test_compile_hourly_statistics_purged_state_changes(
mean = min = max = float(hist["sensor.test1"][-1].state) mean = min = max = float(hist["sensor.test1"][-1].state)
# Purge all states from the database # Purge all states from the database
with patch( with freeze_time(four):
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=four
):
hass.services.call("recorder", "purge", {"keep_days": 0}) hass.services.call("recorder", "purge", {"keep_days": 0})
hass.block_till_done() hass.block_till_done()
wait_recording_done(hass) wait_recording_done(hass)
@ -623,31 +615,32 @@ def test_compile_hourly_statistics_wrong_unit(
hass = hass_recorder() hass = hass_recorder()
setup_component(hass, "sensor", {}) setup_component(hass, "sensor", {})
wait_recording_done(hass) # Wait for the sensor recorder platform to be added wait_recording_done(hass) # Wait for the sensor recorder platform to be added
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
attributes_tmp = dict(attributes) attributes_tmp = dict(attributes)
attributes_tmp["unit_of_measurement"] = "invalid" attributes_tmp["unit_of_measurement"] = "invalid"
_, _states = record_states(hass, zero, "sensor.test2", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test2", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
attributes_tmp.pop("unit_of_measurement") attributes_tmp.pop("unit_of_measurement")
_, _states = record_states(hass, zero, "sensor.test3", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test3", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
attributes_tmp = dict(attributes) attributes_tmp = dict(attributes)
attributes_tmp["state_class"] = "invalid" attributes_tmp["state_class"] = "invalid"
_, _states = record_states(hass, zero, "sensor.test4", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test4", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
attributes_tmp.pop("state_class") attributes_tmp.pop("state_class")
_, _states = record_states(hass, zero, "sensor.test5", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test5", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
attributes_tmp = dict(attributes) attributes_tmp = dict(attributes)
attributes_tmp["device_class"] = "invalid" attributes_tmp["device_class"] = "invalid"
_, _states = record_states(hass, zero, "sensor.test6", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test6", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
attributes_tmp.pop("device_class") attributes_tmp.pop("device_class")
_, _states = record_states(hass, zero, "sensor.test7", attributes_tmp) _, _states = record_states(hass, freezer, zero, "sensor.test7", attributes_tmp)
states = {**states, **_states} states = {**states, **_states}
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -832,7 +825,6 @@ async def test_compile_hourly_sum_statistics_amount(
period0_end = period1 = period0 + timedelta(minutes=5) period0_end = period1 = period0 + timedelta(minutes=5)
period1_end = period2 = period0 + timedelta(minutes=10) period1_end = period2 = period0 + timedelta(minutes=10)
period2_end = period0 + timedelta(minutes=15) period2_end = period0 + timedelta(minutes=15)
client = await hass_ws_client()
hass.config.units = units hass.config.units = units
await async_setup_component(hass, "sensor", {}) await async_setup_component(hass, "sensor", {})
# Wait for the sensor recorder platform to be added # Wait for the sensor recorder platform to be added
@ -844,10 +836,10 @@ async def test_compile_hourly_sum_statistics_amount(
"last_reset": None, "last_reset": None,
} }
seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] seq = [10, 15, 20, 10, 30, 40, 50, 60, 70]
with freeze_time(period0) as freezer:
four, eight, states = await hass.async_add_executor_job( four, eight, states = await hass.async_add_executor_job(
record_meter_states, hass, period0, "sensor.test1", attributes, seq record_meter_states, hass, freezer, period0, "sensor.test1", attributes, seq
) )
await async_wait_recording_done(hass) await async_wait_recording_done(hass)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -938,6 +930,8 @@ async def test_compile_hourly_sum_statistics_amount(
assert "Compiling initial sum statistics for sensor.test1" in caplog.text assert "Compiling initial sum statistics for sensor.test1" in caplog.text
assert "Detected new cycle for sensor.test1, value dropped" not in caplog.text assert "Detected new cycle for sensor.test1, value dropped" not in caplog.text
client = await hass_ws_client()
# Adjust the inserted statistics # Adjust the inserted statistics
await client.send_json( await client.send_json(
{ {
@ -1028,28 +1022,28 @@ def test_compile_hourly_sum_statistics_amount_reset_every_state_change(
assert seq[0] != seq[-1] assert seq[0] != seq[-1]
states = {"sensor.test1": []} states = {"sensor.test1": []}
with freeze_time(zero) as freezer:
# Insert states for a 1st statistics period
one = zero
for i in range(len(seq)):
one = one + timedelta(seconds=5)
attributes = dict(attributes)
attributes["last_reset"] = dt_util.as_local(one).isoformat()
_states = record_meter_state(
hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1]
)
states["sensor.test1"].extend(_states["sensor.test1"])
# Insert states for a 1st statistics period # Insert states for a 2nd statistics period
one = zero two = zero + timedelta(minutes=5)
for i in range(len(seq)): for i in range(len(seq)):
one = one + timedelta(seconds=5) two = two + timedelta(seconds=5)
attributes = dict(attributes) attributes = dict(attributes)
attributes["last_reset"] = dt_util.as_local(one).isoformat() attributes["last_reset"] = dt_util.as_local(two).isoformat()
_states = record_meter_state( _states = record_meter_state(
hass, one, "sensor.test1", attributes, seq[i : i + 1] hass, freezer, two, "sensor.test1", attributes, seq[i : i + 1]
) )
states["sensor.test1"].extend(_states["sensor.test1"]) states["sensor.test1"].extend(_states["sensor.test1"])
# Insert states for a 2nd statistics period
two = zero + timedelta(minutes=5)
for i in range(len(seq)):
two = two + timedelta(seconds=5)
attributes = dict(attributes)
attributes["last_reset"] = dt_util.as_local(two).isoformat()
_states = record_meter_state(
hass, two, "sensor.test1", attributes, seq[i : i + 1]
)
states["sensor.test1"].extend(_states["sensor.test1"])
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1147,17 +1141,18 @@ def test_compile_hourly_sum_statistics_amount_invalid_last_reset(
states = {"sensor.test1": []} states = {"sensor.test1": []}
# Insert states # Insert states
one = zero with freeze_time(zero) as freezer:
for i in range(len(seq)): one = zero
one = one + timedelta(seconds=5) for i in range(len(seq)):
attributes = dict(attributes) one = one + timedelta(seconds=5)
attributes["last_reset"] = dt_util.as_local(one).isoformat() attributes = dict(attributes)
if i == 3: attributes["last_reset"] = dt_util.as_local(one).isoformat()
attributes["last_reset"] = "festivus" # not a valid time if i == 3:
_states = record_meter_state( attributes["last_reset"] = "festivus" # not a valid time
hass, one, "sensor.test1", attributes, seq[i : i + 1] _states = record_meter_state(
) hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1]
states["sensor.test1"].extend(_states["sensor.test1"]) )
states["sensor.test1"].extend(_states["sensor.test1"])
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1243,15 +1238,16 @@ def test_compile_hourly_sum_statistics_nan_inf_state(
seq = [10, math.nan, 15, 15, 20, math.inf, 20, 10] seq = [10, math.nan, 15, 15, 20, math.inf, 20, 10]
states = {"sensor.test1": []} states = {"sensor.test1": []}
one = zero with freeze_time(zero) as freezer:
for i in range(len(seq)): one = zero
one = one + timedelta(seconds=5) for i in range(len(seq)):
attributes = dict(attributes) one = one + timedelta(seconds=5)
attributes["last_reset"] = dt_util.as_local(one).isoformat() attributes = dict(attributes)
_states = record_meter_state( attributes["last_reset"] = dt_util.as_local(one).isoformat()
hass, one, "sensor.test1", attributes, seq[i : i + 1] _states = record_meter_state(
) hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1]
states["sensor.test1"].extend(_states["sensor.test1"]) )
states["sensor.test1"].extend(_states["sensor.test1"])
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1391,10 +1387,13 @@ def test_compile_hourly_sum_statistics_negative_state(
states[entity_id].append(state) states[entity_id].append(state)
offending_state = 6 offending_state = 6
one = zero one = zero
for i in range(len(seq)): with freeze_time(zero) as freezer:
one = one + timedelta(seconds=5) for i in range(len(seq)):
_states = record_meter_state(hass, one, entity_id, attributes, seq[i : i + 1]) one = one + timedelta(seconds=5)
states[entity_id].extend(_states[entity_id]) _states = record_meter_state(
hass, freezer, one, entity_id, attributes, seq[i : i + 1]
)
states[entity_id].extend(_states[entity_id])
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1486,10 +1485,10 @@ def test_compile_hourly_sum_statistics_total_no_reset(
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] seq = [10, 15, 20, 10, 30, 40, 50, 60, 70]
with freeze_time(period0) as freezer:
four, eight, states = record_meter_states( four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq hass, freezer, period0, "sensor.test1", attributes, seq
) )
wait_recording_done(hass) wait_recording_done(hass)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1598,10 +1597,10 @@ def test_compile_hourly_sum_statistics_total_increasing(
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] seq = [10, 15, 20, 10, 30, 40, 50, 60, 70]
with freeze_time(period0) as freezer:
four, eight, states = record_meter_states( four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq hass, freezer, period0, "sensor.test1", attributes, seq
) )
wait_recording_done(hass) wait_recording_done(hass)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1708,10 +1707,10 @@ def test_compile_hourly_sum_statistics_total_increasing_small_dip(
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
seq = [10, 15, 20, 19, 30, 40, 39, 60, 70] seq = [10, 15, 20, 19, 30, 40, 39, 60, 70]
with freeze_time(period0) as freezer:
four, eight, states = record_meter_states( four, eight, states = record_meter_states(
hass, period0, "sensor.test1", attributes, seq hass, freezer, period0, "sensor.test1", attributes, seq
) )
wait_recording_done(hass) wait_recording_done(hass)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
@ -1816,12 +1815,17 @@ def test_compile_hourly_energy_statistics_unsupported(
seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75] seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75]
seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90] seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
four, eight, states = record_meter_states( with freeze_time(period0) as freezer:
hass, period0, "sensor.test1", sns1_attr, seq1 four, eight, states = record_meter_states(
) hass, freezer, period0, "sensor.test1", sns1_attr, seq1
_, _, _states = record_meter_states(hass, period0, "sensor.test2", sns2_attr, seq2) )
states = {**states, **_states} _, _, _states = record_meter_states(
_, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) hass, freezer, period0, "sensor.test2", sns2_attr, seq2
)
states = {**states, **_states}
_, _, _states = record_meter_states(
hass, freezer, period0, "sensor.test3", sns3_attr, seq3
)
states = {**states, **_states} states = {**states, **_states}
wait_recording_done(hass) wait_recording_done(hass)
@ -1914,12 +1918,17 @@ def test_compile_hourly_energy_statistics_multiple(
seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75] seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75]
seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90] seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90]
four, eight, states = record_meter_states( with freeze_time(period0) as freezer:
hass, period0, "sensor.test1", sns1_attr, seq1 four, eight, states = record_meter_states(
) hass, freezer, period0, "sensor.test1", sns1_attr, seq1
_, _, _states = record_meter_states(hass, period0, "sensor.test2", sns2_attr, seq2) )
states = {**states, **_states} _, _, _states = record_meter_states(
_, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) hass, freezer, period0, "sensor.test2", sns2_attr, seq2
)
states = {**states, **_states}
_, _, _states = record_meter_states(
hass, freezer, period0, "sensor.test3", sns3_attr, seq3
)
states = {**states, **_states} states = {**states, **_states}
wait_recording_done(hass) wait_recording_done(hass)
hist = history.get_significant_states( hist = history.get_significant_states(
@ -2114,7 +2123,8 @@ def test_compile_hourly_statistics_unchanged(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -2224,7 +2234,8 @@ def test_compile_hourly_statistics_unavailable(
four, states = record_states_partially_unavailable( four, states = record_states_partially_unavailable(
hass, zero, "sensor.test1", attributes hass, zero, "sensor.test1", attributes
) )
_, _states = record_states(hass, zero, "sensor.test2", attributes) with freeze_time(zero) as freezer:
_, _states = record_states(hass, freezer, zero, "sensor.test2", attributes)
states = {**states, **_states} states = {**states, **_states}
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -2440,16 +2451,17 @@ def test_compile_hourly_statistics_changing_units_1(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
attributes["unit_of_measurement"] = state_unit2 four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
four, _states = record_states( attributes["unit_of_measurement"] = state_unit2
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
four, _states = record_states( states["sensor.test1"] += _states["sensor.test1"]
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -2565,12 +2577,13 @@ def test_compile_hourly_statistics_changing_units_2(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
attributes["unit_of_measurement"] = "cats" four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
four, _states = record_states( attributes["unit_of_measurement"] = "cats"
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -2640,16 +2653,17 @@ def test_compile_hourly_statistics_changing_units_3(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, _states = record_states( four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
attributes["unit_of_measurement"] = "cats" states["sensor.test1"] += _states["sensor.test1"]
four, _states = record_states( attributes["unit_of_measurement"] = "cats"
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
) )
@ -2757,10 +2771,16 @@ def test_compile_hourly_statistics_convert_units_1(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit_1, "unit_of_measurement": state_unit_1,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, _states = record_states( four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
hass, zero + timedelta(minutes=5), "sensor.test1", attributes, seq=[0, 1, None] four, _states = record_states(
) hass,
freezer,
zero + timedelta(minutes=5),
"sensor.test1",
attributes,
seq=[0, 1, None],
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
do_adhoc_statistics(hass, start=zero) do_adhoc_statistics(hass, start=zero)
@ -2796,9 +2816,10 @@ def test_compile_hourly_statistics_convert_units_1(
} }
attributes["unit_of_measurement"] = state_unit_2 attributes["unit_of_measurement"] = state_unit_2
four, _states = record_states( with freeze_time(four) as freezer:
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -2894,15 +2915,16 @@ def test_compile_hourly_statistics_equivalent_units_1(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
attributes["unit_of_measurement"] = state_unit2 four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
four, _states = record_states( attributes["unit_of_measurement"] = state_unit2
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
four, _states = record_states( states["sensor.test1"] += _states["sensor.test1"]
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -3014,11 +3036,12 @@ def test_compile_hourly_statistics_equivalent_units_2(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
attributes["unit_of_measurement"] = state_unit2 four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
four, _states = record_states( attributes["unit_of_measurement"] = state_unit2
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -3104,7 +3127,8 @@ def test_compile_hourly_statistics_changing_device_class_1(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
do_adhoc_statistics(hass, start=zero) do_adhoc_statistics(hass, start=zero)
wait_recording_done(hass) wait_recording_done(hass)
@ -3140,13 +3164,14 @@ def test_compile_hourly_statistics_changing_device_class_1(
# Update device class and record additional states in the original UoM # Update device class and record additional states in the original UoM
attributes["device_class"] = device_class attributes["device_class"] = device_class
four, _states = record_states( with freeze_time(zero) as freezer:
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
four, _states = record_states( states["sensor.test1"] += _states["sensor.test1"]
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -3197,13 +3222,14 @@ def test_compile_hourly_statistics_changing_device_class_1(
# Update device class and record additional states in a different UoM # Update device class and record additional states in a different UoM
attributes["unit_of_measurement"] = statistic_unit attributes["unit_of_measurement"] = statistic_unit
four, _states = record_states( with freeze_time(zero) as freezer:
hass, zero + timedelta(minutes=15), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=15), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
four, _states = record_states( states["sensor.test1"] += _states["sensor.test1"]
hass, zero + timedelta(minutes=20), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=20), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -3308,7 +3334,8 @@ def test_compile_hourly_statistics_changing_device_class_2(
"state_class": "measurement", "state_class": "measurement",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, zero, "sensor.test1", attributes) with freeze_time(zero) as freezer:
four, states = record_states(hass, freezer, zero, "sensor.test1", attributes)
do_adhoc_statistics(hass, start=zero) do_adhoc_statistics(hass, start=zero)
wait_recording_done(hass) wait_recording_done(hass)
@ -3344,13 +3371,14 @@ def test_compile_hourly_statistics_changing_device_class_2(
# Remove device class and record additional states # Remove device class and record additional states
attributes.pop("device_class") attributes.pop("device_class")
four, _states = record_states( with freeze_time(zero) as freezer:
hass, zero + timedelta(minutes=5), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes
states["sensor.test1"] += _states["sensor.test1"] )
four, _states = record_states( states["sensor.test1"] += _states["sensor.test1"]
hass, zero + timedelta(minutes=10), "sensor.test1", attributes four, _states = record_states(
) hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, zero, four, hass.states.async_entity_ids() hass, zero, four, hass.states.async_entity_ids()
@ -3445,7 +3473,10 @@ def test_compile_hourly_statistics_changing_state_class(
"state_class": "total_increasing", "state_class": "total_increasing",
"unit_of_measurement": state_unit, "unit_of_measurement": state_unit,
} }
four, states = record_states(hass, period0, "sensor.test1", attributes_1) with freeze_time(period0) as freezer:
four, states = record_states(
hass, freezer, period0, "sensor.test1", attributes_1
)
do_adhoc_statistics(hass, start=period0) do_adhoc_statistics(hass, start=period0)
wait_recording_done(hass) wait_recording_done(hass)
statistic_ids = list_statistic_ids(hass) statistic_ids = list_statistic_ids(hass)
@ -3477,7 +3508,10 @@ def test_compile_hourly_statistics_changing_state_class(
} }
# Add more states, with changed state class # Add more states, with changed state class
four, _states = record_states(hass, period1, "sensor.test1", attributes_2) with freeze_time(period1) as freezer:
four, _states = record_states(
hass, freezer, period1, "sensor.test1", attributes_2
)
states["sensor.test1"] += _states["sensor.test1"] states["sensor.test1"] += _states["sensor.test1"]
hist = history.get_significant_states( hist = history.get_significant_states(
hass, period0, four, hass.states.async_entity_ids() hass, period0, four, hass.states.async_entity_ids()
@ -3542,6 +3576,7 @@ def test_compile_hourly_statistics_changing_state_class(
assert "Error while processing event StatisticsTask" not in caplog.text assert "Error while processing event StatisticsTask" not in caplog.text
@pytest.mark.timeout(25)
def test_compile_statistics_hourly_daily_monthly_summary( def test_compile_statistics_hourly_daily_monthly_summary(
hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture
) -> None: ) -> None:
@ -3551,9 +3586,7 @@ def test_compile_statistics_hourly_daily_monthly_summary(
zero = zero.replace( zero = zero.replace(
year=2021, month=9, day=1, hour=5, minute=0, second=0, microsecond=0 year=2021, month=9, day=1, hour=5, minute=0, second=0, microsecond=0
) )
with patch( with freeze_time(zero):
"homeassistant.components.recorder.db_schema.dt_util.utcnow", return_value=zero
):
hass = hass_recorder() hass = hass_recorder()
# Remove this after dropping the use of the hass_recorder fixture # Remove this after dropping the use of the hass_recorder fixture
hass.config.set_time_zone("America/Regina") hass.config.set_time_zone("America/Regina")
@ -3618,55 +3651,69 @@ def test_compile_statistics_hourly_daily_monthly_summary(
"sensor.test4": None, "sensor.test4": None,
} }
start = zero start = zero
for i in range(24): with freeze_time(start) as freezer:
seq = [-10, 15, 30] for i in range(24):
# test1 has same value in every period seq = [-10, 15, 30]
four, _states = record_states(hass, start, "sensor.test1", attributes, seq) # test1 has same value in every period
states["sensor.test1"] += _states["sensor.test1"] four, _states = record_states(
last_state = last_states["sensor.test1"] hass, freezer, start, "sensor.test1", attributes, seq
expected_minima["sensor.test1"].append(_min(seq, last_state))
expected_maxima["sensor.test1"].append(_max(seq, last_state))
expected_averages["sensor.test1"].append(_weighted_average(seq, i, last_state))
last_states["sensor.test1"] = seq[-1]
# test2 values change: min/max at the last state
seq = [-10 * (i + 1), 15 * (i + 1), 30 * (i + 1)]
four, _states = record_states(hass, start, "sensor.test2", attributes, seq)
states["sensor.test2"] += _states["sensor.test2"]
last_state = last_states["sensor.test2"]
expected_minima["sensor.test2"].append(_min(seq, last_state))
expected_maxima["sensor.test2"].append(_max(seq, last_state))
expected_averages["sensor.test2"].append(_weighted_average(seq, i, last_state))
last_states["sensor.test2"] = seq[-1]
# test3 values change: min/max at the first state
seq = [-10 * (23 - i + 1), 15 * (23 - i + 1), 30 * (23 - i + 1)]
four, _states = record_states(hass, start, "sensor.test3", attributes, seq)
states["sensor.test3"] += _states["sensor.test3"]
last_state = last_states["sensor.test3"]
expected_minima["sensor.test3"].append(_min(seq, last_state))
expected_maxima["sensor.test3"].append(_max(seq, last_state))
expected_averages["sensor.test3"].append(_weighted_average(seq, i, last_state))
last_states["sensor.test3"] = seq[-1]
# test4 values grow
seq = [i, i + 0.5, i + 0.75]
start_meter = start
for j in range(len(seq)):
_states = record_meter_state(
hass,
start_meter,
"sensor.test4",
sum_attributes,
seq[j : j + 1],
) )
start_meter += timedelta(minutes=1) states["sensor.test1"] += _states["sensor.test1"]
states["sensor.test4"] += _states["sensor.test4"] last_state = last_states["sensor.test1"]
last_state = last_states["sensor.test4"] expected_minima["sensor.test1"].append(_min(seq, last_state))
expected_states["sensor.test4"].append(seq[-1]) expected_maxima["sensor.test1"].append(_max(seq, last_state))
expected_sums["sensor.test4"].append( expected_averages["sensor.test1"].append(
_sum(seq, last_state, expected_sums["sensor.test4"]) _weighted_average(seq, i, last_state)
) )
last_states["sensor.test4"] = seq[-1] last_states["sensor.test1"] = seq[-1]
# test2 values change: min/max at the last state
seq = [-10 * (i + 1), 15 * (i + 1), 30 * (i + 1)]
four, _states = record_states(
hass, freezer, start, "sensor.test2", attributes, seq
)
states["sensor.test2"] += _states["sensor.test2"]
last_state = last_states["sensor.test2"]
expected_minima["sensor.test2"].append(_min(seq, last_state))
expected_maxima["sensor.test2"].append(_max(seq, last_state))
expected_averages["sensor.test2"].append(
_weighted_average(seq, i, last_state)
)
last_states["sensor.test2"] = seq[-1]
# test3 values change: min/max at the first state
seq = [-10 * (23 - i + 1), 15 * (23 - i + 1), 30 * (23 - i + 1)]
four, _states = record_states(
hass, freezer, start, "sensor.test3", attributes, seq
)
states["sensor.test3"] += _states["sensor.test3"]
last_state = last_states["sensor.test3"]
expected_minima["sensor.test3"].append(_min(seq, last_state))
expected_maxima["sensor.test3"].append(_max(seq, last_state))
expected_averages["sensor.test3"].append(
_weighted_average(seq, i, last_state)
)
last_states["sensor.test3"] = seq[-1]
# test4 values grow
seq = [i, i + 0.5, i + 0.75]
start_meter = start
for j in range(len(seq)):
_states = record_meter_state(
hass,
freezer,
start_meter,
"sensor.test4",
sum_attributes,
seq[j : j + 1],
)
start_meter += timedelta(minutes=1)
states["sensor.test4"] += _states["sensor.test4"]
last_state = last_states["sensor.test4"]
expected_states["sensor.test4"].append(seq[-1])
expected_sums["sensor.test4"].append(
_sum(seq, last_state, expected_sums["sensor.test4"])
)
last_states["sensor.test4"] = seq[-1]
start += timedelta(minutes=5) start += timedelta(minutes=5)
hist = history.get_significant_states( hist = history.get_significant_states(
hass, hass,
zero - timedelta.resolution, zero - timedelta.resolution,
@ -3961,7 +4008,14 @@ def test_compile_statistics_hourly_daily_monthly_summary(
assert "Error while processing event StatisticsTask" not in caplog.text assert "Error while processing event StatisticsTask" not in caplog.text
def record_states(hass, zero, entity_id, attributes, seq=None): def record_states(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
zero: datetime,
entity_id: str,
attributes,
seq=None,
):
"""Record some test states. """Record some test states.
We inject a bunch of state updates for measurement sensors. We inject a bunch of state updates for measurement sensors.
@ -3982,26 +4036,14 @@ def record_states(hass, zero, entity_id, attributes, seq=None):
four = three + timedelta(seconds=10 * 5) four = three + timedelta(seconds=10 * 5)
states = {entity_id: []} states = {entity_id: []}
with patch( freezer.move_to(one)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one states[entity_id].append(set_state(entity_id, str(seq[0]), attributes=attributes))
):
states[entity_id].append(
set_state(entity_id, str(seq[0]), attributes=attributes)
)
with patch( freezer.move_to(two)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two states[entity_id].append(set_state(entity_id, str(seq[1]), attributes=attributes))
):
states[entity_id].append(
set_state(entity_id, str(seq[1]), attributes=attributes)
)
with patch( freezer.move_to(three)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three states[entity_id].append(set_state(entity_id, str(seq[2]), attributes=attributes))
):
states[entity_id].append(
set_state(entity_id, str(seq[2]), attributes=attributes)
)
return four, states return four, states
@ -4989,7 +5031,14 @@ async def test_validate_statistics_other_domain(
await assert_validation_result(client, {}) await assert_validation_result(client, {})
def record_meter_states(hass, zero, entity_id, _attributes, seq): def record_meter_states(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
zero: datetime,
entity_id: str,
_attributes,
seq,
):
"""Record some test states. """Record some test states.
We inject a bunch of state updates for meter sensors. We inject a bunch of state updates for meter sensors.
@ -5014,59 +5063,49 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq):
attributes["last_reset"] = zero.isoformat() attributes["last_reset"] = zero.isoformat()
states = {entity_id: []} states = {entity_id: []}
with patch( freezer.move_to(zero)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero
):
states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
with patch( states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes))
with patch( freezer.move_to(one)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes))
with patch( freezer.move_to(two)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes)) freezer.move_to(three)
states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes))
attributes = dict(_attributes) attributes = dict(_attributes)
if "last_reset" in _attributes: if "last_reset" in _attributes:
attributes["last_reset"] = four.isoformat() attributes["last_reset"] = four.isoformat()
with patch( freezer.move_to(four)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=four states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes))
with patch( freezer.move_to(five)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=five states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes))
with patch( freezer.move_to(six)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=six states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes))
with patch( freezer.move_to(seven)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=seven states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes))
with patch( freezer.move_to(eight)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=eight states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes))
return four, eight, states return four, eight, states
def record_meter_state(hass, zero, entity_id, attributes, seq): def record_meter_state(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
zero: datetime,
entity_id: str,
attributes,
seq,
):
"""Record test state. """Record test state.
We inject a state update for meter sensor. We inject a state update for meter sensor.
@ -5079,10 +5118,8 @@ def record_meter_state(hass, zero, entity_id, attributes, seq):
return hass.states.get(entity_id) return hass.states.get(entity_id)
states = {entity_id: []} states = {entity_id: []}
with patch( freezer.move_to(zero)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
):
states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes))
return states return states
@ -5105,19 +5142,13 @@ def record_states_partially_unavailable(hass, zero, entity_id, attributes):
four = three + timedelta(seconds=15 * 5) four = three + timedelta(seconds=15 * 5)
states = {entity_id: []} states = {entity_id: []}
with patch( with freeze_time(one) as freezer:
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=one
):
states[entity_id].append(set_state(entity_id, "10", attributes=attributes)) states[entity_id].append(set_state(entity_id, "10", attributes=attributes))
with patch( freezer.move_to(two)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=two
):
states[entity_id].append(set_state(entity_id, "25", attributes=attributes)) states[entity_id].append(set_state(entity_id, "25", attributes=attributes))
with patch( freezer.move_to(three)
"homeassistant.components.recorder.core.dt_util.utcnow", return_value=three
):
states[entity_id].append( states[entity_id].append(
set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes) set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes)
) )