mirror of
https://github.com/home-assistant/core.git
synced 2025-07-26 22:57:17 +00:00
Fix history_stats with sliding window that ends before now (#145117)
This commit is contained in:
parent
5031ffe767
commit
7e895f7d10
@ -60,6 +60,9 @@ class HistoryStats:
|
|||||||
self._start = start
|
self._start = start
|
||||||
self._end = end
|
self._end = end
|
||||||
|
|
||||||
|
self._pending_events: list[Event[EventStateChangedData]] = []
|
||||||
|
self._query_count = 0
|
||||||
|
|
||||||
async def async_update(
|
async def async_update(
|
||||||
self, event: Event[EventStateChangedData] | None
|
self, event: Event[EventStateChangedData] | None
|
||||||
) -> HistoryStatsState:
|
) -> HistoryStatsState:
|
||||||
@ -85,6 +88,14 @@ class HistoryStats:
|
|||||||
utc_now = dt_util.utcnow()
|
utc_now = dt_util.utcnow()
|
||||||
now_timestamp = floored_timestamp(utc_now)
|
now_timestamp = floored_timestamp(utc_now)
|
||||||
|
|
||||||
|
# If we end up querying data from the recorder when we get triggered by a new state
|
||||||
|
# change event, it is possible this function could be reentered a second time before
|
||||||
|
# the first recorder query returns. In that case a second recorder query will be done
|
||||||
|
# and we need to hold the new event so that we can append it after the second query.
|
||||||
|
# Otherwise the event will be dropped.
|
||||||
|
if event:
|
||||||
|
self._pending_events.append(event)
|
||||||
|
|
||||||
if current_period_start_timestamp > now_timestamp:
|
if current_period_start_timestamp > now_timestamp:
|
||||||
# History cannot tell the future
|
# History cannot tell the future
|
||||||
self._history_current_period = []
|
self._history_current_period = []
|
||||||
@ -113,15 +124,14 @@ class HistoryStats:
|
|||||||
start_changed = (
|
start_changed = (
|
||||||
current_period_start_timestamp != previous_period_start_timestamp
|
current_period_start_timestamp != previous_period_start_timestamp
|
||||||
)
|
)
|
||||||
|
end_changed = current_period_end_timestamp != previous_period_end_timestamp
|
||||||
if start_changed:
|
if start_changed:
|
||||||
self._prune_history_cache(current_period_start_timestamp)
|
self._prune_history_cache(current_period_start_timestamp)
|
||||||
|
|
||||||
new_data = False
|
new_data = False
|
||||||
if event and (new_state := event.data["new_state"]) is not None:
|
if event and (new_state := event.data["new_state"]) is not None:
|
||||||
if (
|
if current_period_start_timestamp <= floored_timestamp(
|
||||||
current_period_start_timestamp
|
new_state.last_changed
|
||||||
<= floored_timestamp(new_state.last_changed)
|
|
||||||
<= current_period_end_timestamp
|
|
||||||
):
|
):
|
||||||
self._history_current_period.append(
|
self._history_current_period.append(
|
||||||
HistoryState(new_state.state, new_state.last_changed_timestamp)
|
HistoryState(new_state.state, new_state.last_changed_timestamp)
|
||||||
@ -131,26 +141,31 @@ class HistoryStats:
|
|||||||
not new_data
|
not new_data
|
||||||
and current_period_end_timestamp < now_timestamp
|
and current_period_end_timestamp < now_timestamp
|
||||||
and not start_changed
|
and not start_changed
|
||||||
|
and not end_changed
|
||||||
):
|
):
|
||||||
# If period has not changed and current time after the period end...
|
# If period has not changed and current time after the period end...
|
||||||
# Don't compute anything as the value cannot have changed
|
# Don't compute anything as the value cannot have changed
|
||||||
return self._state
|
return self._state
|
||||||
else:
|
else:
|
||||||
await self._async_history_from_db(
|
await self._async_history_from_db(
|
||||||
current_period_start_timestamp, current_period_end_timestamp
|
current_period_start_timestamp, now_timestamp
|
||||||
)
|
)
|
||||||
if event and (new_state := event.data["new_state"]) is not None:
|
for pending_event in self._pending_events:
|
||||||
if (
|
if (new_state := pending_event.data["new_state"]) is not None:
|
||||||
current_period_start_timestamp
|
if current_period_start_timestamp <= floored_timestamp(
|
||||||
<= floored_timestamp(new_state.last_changed)
|
new_state.last_changed
|
||||||
<= current_period_end_timestamp
|
|
||||||
):
|
):
|
||||||
self._history_current_period.append(
|
self._history_current_period.append(
|
||||||
HistoryState(new_state.state, new_state.last_changed_timestamp)
|
HistoryState(
|
||||||
|
new_state.state, new_state.last_changed_timestamp
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
self._has_recorder_data = True
|
self._has_recorder_data = True
|
||||||
|
|
||||||
|
if self._query_count == 0:
|
||||||
|
self._pending_events.clear()
|
||||||
|
|
||||||
seconds_matched, match_count = self._async_compute_seconds_and_changes(
|
seconds_matched, match_count = self._async_compute_seconds_and_changes(
|
||||||
now_timestamp,
|
now_timestamp,
|
||||||
current_period_start_timestamp,
|
current_period_start_timestamp,
|
||||||
@ -165,12 +180,16 @@ class HistoryStats:
|
|||||||
current_period_end_timestamp: float,
|
current_period_end_timestamp: float,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Update history data for the current period from the database."""
|
"""Update history data for the current period from the database."""
|
||||||
|
self._query_count += 1
|
||||||
|
try:
|
||||||
instance = get_instance(self.hass)
|
instance = get_instance(self.hass)
|
||||||
states = await instance.async_add_executor_job(
|
states = await instance.async_add_executor_job(
|
||||||
self._state_changes_during_period,
|
self._state_changes_during_period,
|
||||||
current_period_start_timestamp,
|
current_period_start_timestamp,
|
||||||
current_period_end_timestamp,
|
current_period_end_timestamp,
|
||||||
)
|
)
|
||||||
|
finally:
|
||||||
|
self._query_count -= 1
|
||||||
self._history_current_period = [
|
self._history_current_period = [
|
||||||
HistoryState(state.state, state.last_changed.timestamp())
|
HistoryState(state.state, state.last_changed.timestamp())
|
||||||
for state in states
|
for state in states
|
||||||
@ -208,6 +227,9 @@ class HistoryStats:
|
|||||||
current_state_matches = history_state.state in self._entity_states
|
current_state_matches = history_state.state in self._entity_states
|
||||||
state_change_timestamp = history_state.last_changed
|
state_change_timestamp = history_state.last_changed
|
||||||
|
|
||||||
|
if math.floor(state_change_timestamp) > end_timestamp:
|
||||||
|
break
|
||||||
|
|
||||||
if math.floor(state_change_timestamp) > now_timestamp:
|
if math.floor(state_change_timestamp) > now_timestamp:
|
||||||
# Shouldn't count states that are in the future
|
# Shouldn't count states that are in the future
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
@ -215,7 +237,7 @@ class HistoryStats:
|
|||||||
state_change_timestamp,
|
state_change_timestamp,
|
||||||
now_timestamp,
|
now_timestamp,
|
||||||
)
|
)
|
||||||
continue
|
break
|
||||||
|
|
||||||
if previous_state_matches:
|
if previous_state_matches:
|
||||||
elapsed += state_change_timestamp - last_state_change_timestamp
|
elapsed += state_change_timestamp - last_state_change_timestamp
|
||||||
|
@ -1017,6 +1017,18 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
}
|
}
|
||||||
for i, sensor_type in enumerate(["time", "ratio", "count"])
|
for i, sensor_type in enumerate(["time", "ratio", "count"])
|
||||||
]
|
]
|
||||||
|
+ [
|
||||||
|
{
|
||||||
|
"platform": "history_stats",
|
||||||
|
"entity_id": "binary_sensor.state",
|
||||||
|
"name": f"sensor_delayed{i}",
|
||||||
|
"state": "on",
|
||||||
|
"end": "{{ utcnow()-timedelta(minutes=5) }}",
|
||||||
|
"duration": {"minutes": 55},
|
||||||
|
"type": sensor_type,
|
||||||
|
}
|
||||||
|
for i, sensor_type in enumerate(["time", "ratio", "count"])
|
||||||
|
]
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -1028,6 +1040,9 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.0"
|
assert hass.states.get("sensor.sensor0").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor1").state == "0.0"
|
assert hass.states.get("sensor.sensor1").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor2").state == "0"
|
assert hass.states.get("sensor.sensor2").state == "0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "0"
|
||||||
|
|
||||||
with freeze_time(time):
|
with freeze_time(time):
|
||||||
hass.states.async_set("binary_sensor.state", "on")
|
hass.states.async_set("binary_sensor.state", "on")
|
||||||
@ -1038,6 +1053,10 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.0"
|
assert hass.states.get("sensor.sensor0").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor1").state == "0.0"
|
assert hass.states.get("sensor.sensor1").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor2").state == "1"
|
assert hass.states.get("sensor.sensor2").state == "1"
|
||||||
|
# Delayed sensor will not have registered the turn on yet
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "0"
|
||||||
|
|
||||||
# After sensor has been on for 15 minutes, check state
|
# After sensor has been on for 15 minutes, check state
|
||||||
time += timedelta(minutes=15) # 00:15
|
time += timedelta(minutes=15) # 00:15
|
||||||
@ -1048,6 +1067,10 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.25"
|
assert hass.states.get("sensor.sensor0").state == "0.25"
|
||||||
assert hass.states.get("sensor.sensor1").state == "25.0"
|
assert hass.states.get("sensor.sensor1").state == "25.0"
|
||||||
assert hass.states.get("sensor.sensor2").state == "1"
|
assert hass.states.get("sensor.sensor2").state == "1"
|
||||||
|
# Delayed sensor will only have data from 00:00 - 00:10
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.17"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "18.2" # 10 / 55
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "1"
|
||||||
|
|
||||||
with freeze_time(time):
|
with freeze_time(time):
|
||||||
hass.states.async_set("binary_sensor.state", "off")
|
hass.states.async_set("binary_sensor.state", "off")
|
||||||
@ -1064,6 +1087,9 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.25"
|
assert hass.states.get("sensor.sensor0").state == "0.25"
|
||||||
assert hass.states.get("sensor.sensor1").state == "25.0"
|
assert hass.states.get("sensor.sensor1").state == "25.0"
|
||||||
assert hass.states.get("sensor.sensor2").state == "1"
|
assert hass.states.get("sensor.sensor2").state == "1"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.25"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "27.3" # 15 / 55
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "1"
|
||||||
|
|
||||||
time += timedelta(minutes=20) # 01:05
|
time += timedelta(minutes=20) # 01:05
|
||||||
|
|
||||||
@ -1075,6 +1101,9 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.17"
|
assert hass.states.get("sensor.sensor0").state == "0.17"
|
||||||
assert hass.states.get("sensor.sensor1").state == "16.7"
|
assert hass.states.get("sensor.sensor1").state == "16.7"
|
||||||
assert hass.states.get("sensor.sensor2").state == "1"
|
assert hass.states.get("sensor.sensor2").state == "1"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.17"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "18.2" # 10 / 55
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "1"
|
||||||
|
|
||||||
time += timedelta(minutes=5) # 01:10
|
time += timedelta(minutes=5) # 01:10
|
||||||
|
|
||||||
@ -1086,6 +1115,9 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.08"
|
assert hass.states.get("sensor.sensor0").state == "0.08"
|
||||||
assert hass.states.get("sensor.sensor1").state == "8.3"
|
assert hass.states.get("sensor.sensor1").state == "8.3"
|
||||||
assert hass.states.get("sensor.sensor2").state == "1"
|
assert hass.states.get("sensor.sensor2").state == "1"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.08"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "9.1" # 5 / 55
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "1"
|
||||||
|
|
||||||
time += timedelta(minutes=10) # 01:20
|
time += timedelta(minutes=10) # 01:20
|
||||||
|
|
||||||
@ -1096,6 +1128,9 @@ async def test_start_from_history_then_watch_state_changes_sliding(
|
|||||||
assert hass.states.get("sensor.sensor0").state == "0.0"
|
assert hass.states.get("sensor.sensor0").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor1").state == "0.0"
|
assert hass.states.get("sensor.sensor1").state == "0.0"
|
||||||
assert hass.states.get("sensor.sensor2").state == "0"
|
assert hass.states.get("sensor.sensor2").state == "0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed0").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed1").state == "0.0"
|
||||||
|
assert hass.states.get("sensor.sensor_delayed2").state == "0"
|
||||||
|
|
||||||
|
|
||||||
async def test_does_not_work_into_the_future(
|
async def test_does_not_work_into_the_future(
|
||||||
@ -1629,7 +1664,7 @@ async def test_state_change_during_window_rollover(
|
|||||||
"entity_id": "binary_sensor.state",
|
"entity_id": "binary_sensor.state",
|
||||||
"name": "sensor1",
|
"name": "sensor1",
|
||||||
"state": "on",
|
"state": "on",
|
||||||
"start": "{{ today_at() }}",
|
"start": "{{ today_at('12:00') if now().hour == 1 else today_at() }}",
|
||||||
"end": "{{ now() }}",
|
"end": "{{ now() }}",
|
||||||
"type": "time",
|
"type": "time",
|
||||||
}
|
}
|
||||||
@ -1644,7 +1679,7 @@ async def test_state_change_during_window_rollover(
|
|||||||
assert hass.states.get("sensor.sensor1").state == "11.0"
|
assert hass.states.get("sensor.sensor1").state == "11.0"
|
||||||
|
|
||||||
# Advance 59 minutes, to record the last minute update just before midnight, just like a real system would do.
|
# Advance 59 minutes, to record the last minute update just before midnight, just like a real system would do.
|
||||||
t2 = start_time + timedelta(minutes=59, microseconds=300)
|
t2 = start_time + timedelta(minutes=59, microseconds=300) # 23:59
|
||||||
with freeze_time(t2):
|
with freeze_time(t2):
|
||||||
async_fire_time_changed(hass, t2)
|
async_fire_time_changed(hass, t2)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -1653,7 +1688,7 @@ async def test_state_change_during_window_rollover(
|
|||||||
|
|
||||||
# One minute has passed and the time has now rolled over into a new day, resetting the recorder window.
|
# One minute has passed and the time has now rolled over into a new day, resetting the recorder window.
|
||||||
# The sensor will be ON since midnight.
|
# The sensor will be ON since midnight.
|
||||||
t3 = t2 + timedelta(minutes=1)
|
t3 = t2 + timedelta(minutes=1) # 00:01
|
||||||
with freeze_time(t3):
|
with freeze_time(t3):
|
||||||
# The sensor turns off around this time, before the sensor does its normal polled update.
|
# The sensor turns off around this time, before the sensor does its normal polled update.
|
||||||
hass.states.async_set("binary_sensor.state", "off")
|
hass.states.async_set("binary_sensor.state", "off")
|
||||||
@ -1662,13 +1697,69 @@ async def test_state_change_during_window_rollover(
|
|||||||
assert hass.states.get("sensor.sensor1").state == "0.0"
|
assert hass.states.get("sensor.sensor1").state == "0.0"
|
||||||
|
|
||||||
# More time passes, and the history stats does a polled update again. It should be 0 since the sensor has been off since midnight.
|
# More time passes, and the history stats does a polled update again. It should be 0 since the sensor has been off since midnight.
|
||||||
t4 = t3 + timedelta(minutes=10)
|
# Turn the sensor back on.
|
||||||
|
t4 = t3 + timedelta(minutes=10) # 00:10
|
||||||
with freeze_time(t4):
|
with freeze_time(t4):
|
||||||
async_fire_time_changed(hass, t4)
|
async_fire_time_changed(hass, t4)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
hass.states.async_set("binary_sensor.state", "on")
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
assert hass.states.get("sensor.sensor1").state == "0.0"
|
assert hass.states.get("sensor.sensor1").state == "0.0"
|
||||||
|
|
||||||
|
# Due to time change, start time has now moved into the future. Turn off the sensor.
|
||||||
|
t5 = t4 + timedelta(hours=1) # 01:10
|
||||||
|
with freeze_time(t5):
|
||||||
|
hass.states.async_set("binary_sensor.state", "off")
|
||||||
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
|
assert hass.states.get("sensor.sensor1").state == STATE_UNKNOWN
|
||||||
|
|
||||||
|
# Start time has moved back to start of today. Turn the sensor on at the same time it is recomputed
|
||||||
|
# Should query the recorder this time due to start time moving backwards in time.
|
||||||
|
t6 = t5 + timedelta(hours=1) # 02:10
|
||||||
|
|
||||||
|
def _fake_states_t6(*args, **kwargs):
|
||||||
|
return {
|
||||||
|
"binary_sensor.state": [
|
||||||
|
ha.State(
|
||||||
|
"binary_sensor.state",
|
||||||
|
"off",
|
||||||
|
last_changed=t6.replace(hour=0, minute=0, second=0, microsecond=0),
|
||||||
|
),
|
||||||
|
ha.State(
|
||||||
|
"binary_sensor.state",
|
||||||
|
"on",
|
||||||
|
last_changed=t6.replace(hour=0, minute=10, second=0, microsecond=0),
|
||||||
|
),
|
||||||
|
ha.State(
|
||||||
|
"binary_sensor.state",
|
||||||
|
"off",
|
||||||
|
last_changed=t6.replace(hour=1, minute=10, second=0, microsecond=0),
|
||||||
|
),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch(
|
||||||
|
"homeassistant.components.recorder.history.state_changes_during_period",
|
||||||
|
_fake_states_t6,
|
||||||
|
),
|
||||||
|
freeze_time(t6),
|
||||||
|
):
|
||||||
|
hass.states.async_set("binary_sensor.state", "on")
|
||||||
|
await hass.async_block_till_done(wait_background_tasks=True)
|
||||||
|
|
||||||
|
assert hass.states.get("sensor.sensor1").state == "1.0"
|
||||||
|
|
||||||
|
# Another hour passes since the re-query. Total 'On' time should be 2 hours (00:10-1:10, 2:10-now (3:10))
|
||||||
|
t7 = t6 + timedelta(hours=1) # 03:10
|
||||||
|
with freeze_time(t7):
|
||||||
|
async_fire_time_changed(hass, t7)
|
||||||
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
assert hass.states.get("sensor.sensor1").state == "2.0"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii"])
|
@pytest.mark.parametrize("time_zone", ["Europe/Berlin", "America/Chicago", "US/Hawaii"])
|
||||||
async def test_end_time_with_microseconds_zeroed(
|
async def test_end_time_with_microseconds_zeroed(
|
||||||
@ -1934,7 +2025,7 @@ async def test_history_stats_handles_floored_timestamps(
|
|||||||
await async_update_entity(hass, "sensor.sensor1")
|
await async_update_entity(hass, "sensor.sensor1")
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
assert last_times == (start_time, start_time + timedelta(hours=2))
|
assert last_times == (start_time, start_time)
|
||||||
|
|
||||||
|
|
||||||
async def test_unique_id(
|
async def test_unique_id(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user