From 7ec6e03d5d9a5c80c93d5f9e51e788a7108585c5 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Wed, 24 May 2023 19:54:08 -0500 Subject: [PATCH] Update sensor tests to avoid patching utcnow (#93497) --- tests/components/sensor/test_recorder.py | 653 ++++++++++++----------- 1 file changed, 342 insertions(+), 311 deletions(-) diff --git a/tests/components/sensor/test_recorder.py b/tests/components/sensor/test_recorder.py index 53fb2493d70..65b0a0b9485 100644 --- a/tests/components/sensor/test_recorder.py +++ b/tests/components/sensor/test_recorder.py @@ -7,6 +7,8 @@ import math from statistics import mean from unittest.mock import patch +from freezegun import freeze_time +from freezegun.api import FrozenDateTimeFactory import pytest from homeassistant import loader @@ -155,7 +157,8 @@ def test_compile_hourly_statistics( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -251,17 +254,13 @@ def test_compile_hourly_statistics_with_some_same_last_updated( four = three + timedelta(seconds=10 * 5) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one - ): + with freeze_time(one) as freezer: states[entity_id].append( set_state(entity_id, str(seq[0]), attributes=attributes) ) - # Record two states at the exact same time - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two - ): + # Record two states at the exact same time + freezer.move_to(two) states[entity_id].append( set_state(entity_id, str(seq[1]), attributes=attributes) ) @@ -269,9 +268,7 @@ def test_compile_hourly_statistics_with_some_same_last_updated( set_state(entity_id, str(seq[2]), attributes=attributes) ) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three - ): + freezer.move_to(three) states[entity_id].append( set_state(entity_id, str(seq[3]), attributes=attributes) ) @@ -371,9 +368,7 @@ def test_compile_hourly_statistics_with_all_same_last_updated( four = three + timedelta(seconds=10 * 5) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two - ): + with freeze_time(two): states[entity_id].append( set_state(entity_id, str(seq[0]), attributes=attributes) ) @@ -480,9 +475,7 @@ def test_compile_hourly_statistics_only_state_is_and_end_of_period( end = zero + timedelta(minutes=5) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=end - ): + with freeze_time(end): states[entity_id].append( set_state(entity_id, str(seq[0]), attributes=attributes) ) @@ -559,7 +552,8 @@ def test_compile_hourly_statistics_purged_state_changes( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -568,9 +562,7 @@ def test_compile_hourly_statistics_purged_state_changes( mean = min = max = float(hist["sensor.test1"][-1].state) # Purge all states from the database - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=four - ): + with freeze_time(four): hass.services.call("recorder", "purge", {"keep_days": 0}) hass.block_till_done() wait_recording_done(hass) @@ -623,31 +615,32 @@ def test_compile_hourly_statistics_wrong_unit( hass = hass_recorder() setup_component(hass, "sensor", {}) wait_recording_done(hass) # Wait for the sensor recorder platform to be added - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) - attributes_tmp = dict(attributes) - attributes_tmp["unit_of_measurement"] = "invalid" - _, _states = record_states(hass, zero, "sensor.test2", attributes_tmp) - states = {**states, **_states} - attributes_tmp.pop("unit_of_measurement") - _, _states = record_states(hass, zero, "sensor.test3", attributes_tmp) - states = {**states, **_states} + attributes_tmp = dict(attributes) + attributes_tmp["unit_of_measurement"] = "invalid" + _, _states = record_states(hass, freezer, zero, "sensor.test2", attributes_tmp) + states = {**states, **_states} + attributes_tmp.pop("unit_of_measurement") + _, _states = record_states(hass, freezer, zero, "sensor.test3", attributes_tmp) + states = {**states, **_states} - attributes_tmp = dict(attributes) - attributes_tmp["state_class"] = "invalid" - _, _states = record_states(hass, zero, "sensor.test4", attributes_tmp) - states = {**states, **_states} - attributes_tmp.pop("state_class") - _, _states = record_states(hass, zero, "sensor.test5", attributes_tmp) - states = {**states, **_states} + attributes_tmp = dict(attributes) + attributes_tmp["state_class"] = "invalid" + _, _states = record_states(hass, freezer, zero, "sensor.test4", attributes_tmp) + states = {**states, **_states} + attributes_tmp.pop("state_class") + _, _states = record_states(hass, freezer, zero, "sensor.test5", attributes_tmp) + states = {**states, **_states} - attributes_tmp = dict(attributes) - attributes_tmp["device_class"] = "invalid" - _, _states = record_states(hass, zero, "sensor.test6", attributes_tmp) - states = {**states, **_states} - attributes_tmp.pop("device_class") - _, _states = record_states(hass, zero, "sensor.test7", attributes_tmp) - states = {**states, **_states} + attributes_tmp = dict(attributes) + attributes_tmp["device_class"] = "invalid" + _, _states = record_states(hass, freezer, zero, "sensor.test6", attributes_tmp) + states = {**states, **_states} + attributes_tmp.pop("device_class") + _, _states = record_states(hass, freezer, zero, "sensor.test7", attributes_tmp) + states = {**states, **_states} hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -832,7 +825,6 @@ async def test_compile_hourly_sum_statistics_amount( period0_end = period1 = period0 + timedelta(minutes=5) period1_end = period2 = period0 + timedelta(minutes=10) period2_end = period0 + timedelta(minutes=15) - client = await hass_ws_client() hass.config.units = units await async_setup_component(hass, "sensor", {}) # Wait for the sensor recorder platform to be added @@ -844,10 +836,10 @@ async def test_compile_hourly_sum_statistics_amount( "last_reset": None, } seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] - - four, eight, states = await hass.async_add_executor_job( - record_meter_states, hass, period0, "sensor.test1", attributes, seq - ) + with freeze_time(period0) as freezer: + four, eight, states = await hass.async_add_executor_job( + record_meter_states, hass, freezer, period0, "sensor.test1", attributes, seq + ) await async_wait_recording_done(hass) hist = history.get_significant_states( hass, @@ -938,6 +930,8 @@ async def test_compile_hourly_sum_statistics_amount( assert "Compiling initial sum statistics for sensor.test1" in caplog.text assert "Detected new cycle for sensor.test1, value dropped" not in caplog.text + client = await hass_ws_client() + # Adjust the inserted statistics await client.send_json( { @@ -1028,28 +1022,28 @@ def test_compile_hourly_sum_statistics_amount_reset_every_state_change( assert seq[0] != seq[-1] states = {"sensor.test1": []} + with freeze_time(zero) as freezer: + # Insert states for a 1st statistics period + one = zero + for i in range(len(seq)): + one = one + timedelta(seconds=5) + attributes = dict(attributes) + attributes["last_reset"] = dt_util.as_local(one).isoformat() + _states = record_meter_state( + hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1] + ) + states["sensor.test1"].extend(_states["sensor.test1"]) - # Insert states for a 1st statistics period - one = zero - for i in range(len(seq)): - one = one + timedelta(seconds=5) - attributes = dict(attributes) - attributes["last_reset"] = dt_util.as_local(one).isoformat() - _states = record_meter_state( - hass, one, "sensor.test1", attributes, seq[i : i + 1] - ) - states["sensor.test1"].extend(_states["sensor.test1"]) - - # Insert states for a 2nd statistics period - two = zero + timedelta(minutes=5) - for i in range(len(seq)): - two = two + timedelta(seconds=5) - attributes = dict(attributes) - attributes["last_reset"] = dt_util.as_local(two).isoformat() - _states = record_meter_state( - hass, two, "sensor.test1", attributes, seq[i : i + 1] - ) - states["sensor.test1"].extend(_states["sensor.test1"]) + # Insert states for a 2nd statistics period + two = zero + timedelta(minutes=5) + for i in range(len(seq)): + two = two + timedelta(seconds=5) + attributes = dict(attributes) + attributes["last_reset"] = dt_util.as_local(two).isoformat() + _states = record_meter_state( + hass, freezer, two, "sensor.test1", attributes, seq[i : i + 1] + ) + states["sensor.test1"].extend(_states["sensor.test1"]) hist = history.get_significant_states( hass, @@ -1147,17 +1141,18 @@ def test_compile_hourly_sum_statistics_amount_invalid_last_reset( states = {"sensor.test1": []} # Insert states - one = zero - for i in range(len(seq)): - one = one + timedelta(seconds=5) - attributes = dict(attributes) - attributes["last_reset"] = dt_util.as_local(one).isoformat() - if i == 3: - attributes["last_reset"] = "festivus" # not a valid time - _states = record_meter_state( - hass, one, "sensor.test1", attributes, seq[i : i + 1] - ) - states["sensor.test1"].extend(_states["sensor.test1"]) + with freeze_time(zero) as freezer: + one = zero + for i in range(len(seq)): + one = one + timedelta(seconds=5) + attributes = dict(attributes) + attributes["last_reset"] = dt_util.as_local(one).isoformat() + if i == 3: + attributes["last_reset"] = "festivus" # not a valid time + _states = record_meter_state( + hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1] + ) + states["sensor.test1"].extend(_states["sensor.test1"]) hist = history.get_significant_states( hass, @@ -1243,15 +1238,16 @@ def test_compile_hourly_sum_statistics_nan_inf_state( seq = [10, math.nan, 15, 15, 20, math.inf, 20, 10] states = {"sensor.test1": []} - one = zero - for i in range(len(seq)): - one = one + timedelta(seconds=5) - attributes = dict(attributes) - attributes["last_reset"] = dt_util.as_local(one).isoformat() - _states = record_meter_state( - hass, one, "sensor.test1", attributes, seq[i : i + 1] - ) - states["sensor.test1"].extend(_states["sensor.test1"]) + with freeze_time(zero) as freezer: + one = zero + for i in range(len(seq)): + one = one + timedelta(seconds=5) + attributes = dict(attributes) + attributes["last_reset"] = dt_util.as_local(one).isoformat() + _states = record_meter_state( + hass, freezer, one, "sensor.test1", attributes, seq[i : i + 1] + ) + states["sensor.test1"].extend(_states["sensor.test1"]) hist = history.get_significant_states( hass, @@ -1391,10 +1387,13 @@ def test_compile_hourly_sum_statistics_negative_state( states[entity_id].append(state) offending_state = 6 one = zero - for i in range(len(seq)): - one = one + timedelta(seconds=5) - _states = record_meter_state(hass, one, entity_id, attributes, seq[i : i + 1]) - states[entity_id].extend(_states[entity_id]) + with freeze_time(zero) as freezer: + for i in range(len(seq)): + one = one + timedelta(seconds=5) + _states = record_meter_state( + hass, freezer, one, entity_id, attributes, seq[i : i + 1] + ) + states[entity_id].extend(_states[entity_id]) hist = history.get_significant_states( hass, @@ -1486,10 +1485,10 @@ def test_compile_hourly_sum_statistics_total_no_reset( "unit_of_measurement": state_unit, } seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] - - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", attributes, seq - ) + with freeze_time(period0) as freezer: + four, eight, states = record_meter_states( + hass, freezer, period0, "sensor.test1", attributes, seq + ) wait_recording_done(hass) hist = history.get_significant_states( hass, @@ -1598,10 +1597,10 @@ def test_compile_hourly_sum_statistics_total_increasing( "unit_of_measurement": state_unit, } seq = [10, 15, 20, 10, 30, 40, 50, 60, 70] - - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", attributes, seq - ) + with freeze_time(period0) as freezer: + four, eight, states = record_meter_states( + hass, freezer, period0, "sensor.test1", attributes, seq + ) wait_recording_done(hass) hist = history.get_significant_states( hass, @@ -1708,10 +1707,10 @@ def test_compile_hourly_sum_statistics_total_increasing_small_dip( "unit_of_measurement": state_unit, } seq = [10, 15, 20, 19, 30, 40, 39, 60, 70] - - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", attributes, seq - ) + with freeze_time(period0) as freezer: + four, eight, states = record_meter_states( + hass, freezer, period0, "sensor.test1", attributes, seq + ) wait_recording_done(hass) hist = history.get_significant_states( hass, @@ -1816,12 +1815,17 @@ def test_compile_hourly_energy_statistics_unsupported( seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75] seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90] - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", sns1_attr, seq1 - ) - _, _, _states = record_meter_states(hass, period0, "sensor.test2", sns2_attr, seq2) - states = {**states, **_states} - _, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) + with freeze_time(period0) as freezer: + four, eight, states = record_meter_states( + hass, freezer, period0, "sensor.test1", sns1_attr, seq1 + ) + _, _, _states = record_meter_states( + hass, freezer, period0, "sensor.test2", sns2_attr, seq2 + ) + states = {**states, **_states} + _, _, _states = record_meter_states( + hass, freezer, period0, "sensor.test3", sns3_attr, seq3 + ) states = {**states, **_states} wait_recording_done(hass) @@ -1914,12 +1918,17 @@ def test_compile_hourly_energy_statistics_multiple( seq2 = [110, 120, 130, 0, 30, 45, 55, 65, 75] seq3 = [0, 0, 5, 10, 30, 50, 60, 80, 90] - four, eight, states = record_meter_states( - hass, period0, "sensor.test1", sns1_attr, seq1 - ) - _, _, _states = record_meter_states(hass, period0, "sensor.test2", sns2_attr, seq2) - states = {**states, **_states} - _, _, _states = record_meter_states(hass, period0, "sensor.test3", sns3_attr, seq3) + with freeze_time(period0) as freezer: + four, eight, states = record_meter_states( + hass, freezer, period0, "sensor.test1", sns1_attr, seq1 + ) + _, _, _states = record_meter_states( + hass, freezer, period0, "sensor.test2", sns2_attr, seq2 + ) + states = {**states, **_states} + _, _, _states = record_meter_states( + hass, freezer, period0, "sensor.test3", sns3_attr, seq3 + ) states = {**states, **_states} wait_recording_done(hass) hist = history.get_significant_states( @@ -2114,7 +2123,8 @@ def test_compile_hourly_statistics_unchanged( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -2224,7 +2234,8 @@ def test_compile_hourly_statistics_unavailable( four, states = record_states_partially_unavailable( hass, zero, "sensor.test1", attributes ) - _, _states = record_states(hass, zero, "sensor.test2", attributes) + with freeze_time(zero) as freezer: + _, _states = record_states(hass, freezer, zero, "sensor.test2", attributes) states = {**states, **_states} hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -2440,16 +2451,17 @@ def test_compile_hourly_statistics_changing_units_1( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - attributes["unit_of_measurement"] = state_unit2 - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + attributes["unit_of_measurement"] = state_unit2 + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -2565,12 +2577,13 @@ def test_compile_hourly_statistics_changing_units_2( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - attributes["unit_of_measurement"] = "cats" - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + attributes["unit_of_measurement"] = "cats" + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -2640,16 +2653,17 @@ def test_compile_hourly_statistics_changing_units_3( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - attributes["unit_of_measurement"] = "cats" - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + attributes["unit_of_measurement"] = "cats" + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() ) @@ -2757,10 +2771,16 @@ def test_compile_hourly_statistics_convert_units_1( "state_class": "measurement", "unit_of_measurement": state_unit_1, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes, seq=[0, 1, None] - ) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + four, _states = record_states( + hass, + freezer, + zero + timedelta(minutes=5), + "sensor.test1", + attributes, + seq=[0, 1, None], + ) states["sensor.test1"] += _states["sensor.test1"] do_adhoc_statistics(hass, start=zero) @@ -2796,9 +2816,10 @@ def test_compile_hourly_statistics_convert_units_1( } attributes["unit_of_measurement"] = state_unit_2 - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) + with freeze_time(four) as freezer: + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -2894,15 +2915,16 @@ def test_compile_hourly_statistics_equivalent_units_1( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - attributes["unit_of_measurement"] = state_unit2 - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + attributes["unit_of_measurement"] = state_unit2 + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -3014,11 +3036,12 @@ def test_compile_hourly_statistics_equivalent_units_2( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) - attributes["unit_of_measurement"] = state_unit2 - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) + attributes["unit_of_measurement"] = state_unit2 + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -3104,7 +3127,8 @@ def test_compile_hourly_statistics_changing_device_class_1( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) do_adhoc_statistics(hass, start=zero) wait_recording_done(hass) @@ -3140,13 +3164,14 @@ def test_compile_hourly_statistics_changing_device_class_1( # Update device class and record additional states in the original UoM attributes["device_class"] = device_class - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) + with freeze_time(zero) as freezer: + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -3197,13 +3222,14 @@ def test_compile_hourly_statistics_changing_device_class_1( # Update device class and record additional states in a different UoM attributes["unit_of_measurement"] = statistic_unit - four, _states = record_states( - hass, zero + timedelta(minutes=15), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - four, _states = record_states( - hass, zero + timedelta(minutes=20), "sensor.test1", attributes - ) + with freeze_time(zero) as freezer: + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=15), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=20), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -3308,7 +3334,8 @@ def test_compile_hourly_statistics_changing_device_class_2( "state_class": "measurement", "unit_of_measurement": state_unit, } - four, states = record_states(hass, zero, "sensor.test1", attributes) + with freeze_time(zero) as freezer: + four, states = record_states(hass, freezer, zero, "sensor.test1", attributes) do_adhoc_statistics(hass, start=zero) wait_recording_done(hass) @@ -3344,13 +3371,14 @@ def test_compile_hourly_statistics_changing_device_class_2( # Remove device class and record additional states attributes.pop("device_class") - four, _states = record_states( - hass, zero + timedelta(minutes=5), "sensor.test1", attributes - ) - states["sensor.test1"] += _states["sensor.test1"] - four, _states = record_states( - hass, zero + timedelta(minutes=10), "sensor.test1", attributes - ) + with freeze_time(zero) as freezer: + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=5), "sensor.test1", attributes + ) + states["sensor.test1"] += _states["sensor.test1"] + four, _states = record_states( + hass, freezer, zero + timedelta(minutes=10), "sensor.test1", attributes + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, zero, four, hass.states.async_entity_ids() @@ -3445,7 +3473,10 @@ def test_compile_hourly_statistics_changing_state_class( "state_class": "total_increasing", "unit_of_measurement": state_unit, } - four, states = record_states(hass, period0, "sensor.test1", attributes_1) + with freeze_time(period0) as freezer: + four, states = record_states( + hass, freezer, period0, "sensor.test1", attributes_1 + ) do_adhoc_statistics(hass, start=period0) wait_recording_done(hass) statistic_ids = list_statistic_ids(hass) @@ -3477,7 +3508,10 @@ def test_compile_hourly_statistics_changing_state_class( } # Add more states, with changed state class - four, _states = record_states(hass, period1, "sensor.test1", attributes_2) + with freeze_time(period1) as freezer: + four, _states = record_states( + hass, freezer, period1, "sensor.test1", attributes_2 + ) states["sensor.test1"] += _states["sensor.test1"] hist = history.get_significant_states( hass, period0, four, hass.states.async_entity_ids() @@ -3542,6 +3576,7 @@ def test_compile_hourly_statistics_changing_state_class( assert "Error while processing event StatisticsTask" not in caplog.text +@pytest.mark.timeout(25) def test_compile_statistics_hourly_daily_monthly_summary( hass_recorder: Callable[..., HomeAssistant], caplog: pytest.LogCaptureFixture ) -> None: @@ -3551,9 +3586,7 @@ def test_compile_statistics_hourly_daily_monthly_summary( zero = zero.replace( year=2021, month=9, day=1, hour=5, minute=0, second=0, microsecond=0 ) - with patch( - "homeassistant.components.recorder.db_schema.dt_util.utcnow", return_value=zero - ): + with freeze_time(zero): hass = hass_recorder() # Remove this after dropping the use of the hass_recorder fixture hass.config.set_time_zone("America/Regina") @@ -3618,55 +3651,69 @@ def test_compile_statistics_hourly_daily_monthly_summary( "sensor.test4": None, } start = zero - for i in range(24): - seq = [-10, 15, 30] - # test1 has same value in every period - four, _states = record_states(hass, start, "sensor.test1", attributes, seq) - states["sensor.test1"] += _states["sensor.test1"] - last_state = last_states["sensor.test1"] - expected_minima["sensor.test1"].append(_min(seq, last_state)) - expected_maxima["sensor.test1"].append(_max(seq, last_state)) - expected_averages["sensor.test1"].append(_weighted_average(seq, i, last_state)) - last_states["sensor.test1"] = seq[-1] - # test2 values change: min/max at the last state - seq = [-10 * (i + 1), 15 * (i + 1), 30 * (i + 1)] - four, _states = record_states(hass, start, "sensor.test2", attributes, seq) - states["sensor.test2"] += _states["sensor.test2"] - last_state = last_states["sensor.test2"] - expected_minima["sensor.test2"].append(_min(seq, last_state)) - expected_maxima["sensor.test2"].append(_max(seq, last_state)) - expected_averages["sensor.test2"].append(_weighted_average(seq, i, last_state)) - last_states["sensor.test2"] = seq[-1] - # test3 values change: min/max at the first state - seq = [-10 * (23 - i + 1), 15 * (23 - i + 1), 30 * (23 - i + 1)] - four, _states = record_states(hass, start, "sensor.test3", attributes, seq) - states["sensor.test3"] += _states["sensor.test3"] - last_state = last_states["sensor.test3"] - expected_minima["sensor.test3"].append(_min(seq, last_state)) - expected_maxima["sensor.test3"].append(_max(seq, last_state)) - expected_averages["sensor.test3"].append(_weighted_average(seq, i, last_state)) - last_states["sensor.test3"] = seq[-1] - # test4 values grow - seq = [i, i + 0.5, i + 0.75] - start_meter = start - for j in range(len(seq)): - _states = record_meter_state( - hass, - start_meter, - "sensor.test4", - sum_attributes, - seq[j : j + 1], + with freeze_time(start) as freezer: + for i in range(24): + seq = [-10, 15, 30] + # test1 has same value in every period + four, _states = record_states( + hass, freezer, start, "sensor.test1", attributes, seq ) - start_meter += timedelta(minutes=1) - states["sensor.test4"] += _states["sensor.test4"] - last_state = last_states["sensor.test4"] - expected_states["sensor.test4"].append(seq[-1]) - expected_sums["sensor.test4"].append( - _sum(seq, last_state, expected_sums["sensor.test4"]) - ) - last_states["sensor.test4"] = seq[-1] + states["sensor.test1"] += _states["sensor.test1"] + last_state = last_states["sensor.test1"] + expected_minima["sensor.test1"].append(_min(seq, last_state)) + expected_maxima["sensor.test1"].append(_max(seq, last_state)) + expected_averages["sensor.test1"].append( + _weighted_average(seq, i, last_state) + ) + last_states["sensor.test1"] = seq[-1] + # test2 values change: min/max at the last state + seq = [-10 * (i + 1), 15 * (i + 1), 30 * (i + 1)] + four, _states = record_states( + hass, freezer, start, "sensor.test2", attributes, seq + ) + states["sensor.test2"] += _states["sensor.test2"] + last_state = last_states["sensor.test2"] + expected_minima["sensor.test2"].append(_min(seq, last_state)) + expected_maxima["sensor.test2"].append(_max(seq, last_state)) + expected_averages["sensor.test2"].append( + _weighted_average(seq, i, last_state) + ) + last_states["sensor.test2"] = seq[-1] + # test3 values change: min/max at the first state + seq = [-10 * (23 - i + 1), 15 * (23 - i + 1), 30 * (23 - i + 1)] + four, _states = record_states( + hass, freezer, start, "sensor.test3", attributes, seq + ) + states["sensor.test3"] += _states["sensor.test3"] + last_state = last_states["sensor.test3"] + expected_minima["sensor.test3"].append(_min(seq, last_state)) + expected_maxima["sensor.test3"].append(_max(seq, last_state)) + expected_averages["sensor.test3"].append( + _weighted_average(seq, i, last_state) + ) + last_states["sensor.test3"] = seq[-1] + # test4 values grow + seq = [i, i + 0.5, i + 0.75] + start_meter = start + for j in range(len(seq)): + _states = record_meter_state( + hass, + freezer, + start_meter, + "sensor.test4", + sum_attributes, + seq[j : j + 1], + ) + start_meter += timedelta(minutes=1) + states["sensor.test4"] += _states["sensor.test4"] + last_state = last_states["sensor.test4"] + expected_states["sensor.test4"].append(seq[-1]) + expected_sums["sensor.test4"].append( + _sum(seq, last_state, expected_sums["sensor.test4"]) + ) + last_states["sensor.test4"] = seq[-1] - start += timedelta(minutes=5) + start += timedelta(minutes=5) hist = history.get_significant_states( hass, zero - timedelta.resolution, @@ -3961,7 +4008,14 @@ def test_compile_statistics_hourly_daily_monthly_summary( assert "Error while processing event StatisticsTask" not in caplog.text -def record_states(hass, zero, entity_id, attributes, seq=None): +def record_states( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + zero: datetime, + entity_id: str, + attributes, + seq=None, +): """Record some test states. We inject a bunch of state updates for measurement sensors. @@ -3982,26 +4036,14 @@ def record_states(hass, zero, entity_id, attributes, seq=None): four = three + timedelta(seconds=10 * 5) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one - ): - states[entity_id].append( - set_state(entity_id, str(seq[0]), attributes=attributes) - ) + freezer.move_to(one) + states[entity_id].append(set_state(entity_id, str(seq[0]), attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two - ): - states[entity_id].append( - set_state(entity_id, str(seq[1]), attributes=attributes) - ) + freezer.move_to(two) + states[entity_id].append(set_state(entity_id, str(seq[1]), attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three - ): - states[entity_id].append( - set_state(entity_id, str(seq[2]), attributes=attributes) - ) + freezer.move_to(three) + states[entity_id].append(set_state(entity_id, str(seq[2]), attributes=attributes)) return four, states @@ -4989,7 +5031,14 @@ async def test_validate_statistics_other_domain( await assert_validation_result(client, {}) -def record_meter_states(hass, zero, entity_id, _attributes, seq): +def record_meter_states( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + zero: datetime, + entity_id: str, + _attributes, + seq, +): """Record some test states. We inject a bunch of state updates for meter sensors. @@ -5014,59 +5063,49 @@ def record_meter_states(hass, zero, entity_id, _attributes, seq): attributes["last_reset"] = zero.isoformat() states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero - ): - states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) + freezer.move_to(zero) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one - ): - states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes)) + states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two - ): - states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes)) + freezer.move_to(one) + states[entity_id].append(set_state(entity_id, seq[1], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three - ): - states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes)) + freezer.move_to(two) + states[entity_id].append(set_state(entity_id, seq[2], attributes=attributes)) + + freezer.move_to(three) + states[entity_id].append(set_state(entity_id, seq[3], attributes=attributes)) attributes = dict(_attributes) if "last_reset" in _attributes: attributes["last_reset"] = four.isoformat() - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=four - ): - states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes)) + freezer.move_to(four) + states[entity_id].append(set_state(entity_id, seq[4], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=five - ): - states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes)) + freezer.move_to(five) + states[entity_id].append(set_state(entity_id, seq[5], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=six - ): - states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes)) + freezer.move_to(six) + states[entity_id].append(set_state(entity_id, seq[6], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=seven - ): - states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes)) + freezer.move_to(seven) + states[entity_id].append(set_state(entity_id, seq[7], attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=eight - ): - states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes)) + freezer.move_to(eight) + states[entity_id].append(set_state(entity_id, seq[8], attributes=attributes)) return four, eight, states -def record_meter_state(hass, zero, entity_id, attributes, seq): +def record_meter_state( + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + zero: datetime, + entity_id: str, + attributes, + seq, +): """Record test state. We inject a state update for meter sensor. @@ -5079,10 +5118,8 @@ def record_meter_state(hass, zero, entity_id, attributes, seq): return hass.states.get(entity_id) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=zero - ): - states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) + freezer.move_to(zero) + states[entity_id].append(set_state(entity_id, seq[0], attributes=attributes)) return states @@ -5105,19 +5142,13 @@ def record_states_partially_unavailable(hass, zero, entity_id, attributes): four = three + timedelta(seconds=15 * 5) states = {entity_id: []} - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=one - ): + with freeze_time(one) as freezer: states[entity_id].append(set_state(entity_id, "10", attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=two - ): + freezer.move_to(two) states[entity_id].append(set_state(entity_id, "25", attributes=attributes)) - with patch( - "homeassistant.components.recorder.core.dt_util.utcnow", return_value=three - ): + freezer.move_to(three) states[entity_id].append( set_state(entity_id, STATE_UNAVAILABLE, attributes=attributes) )