Compare commits

...

3 Commits

Author SHA1 Message Date
Erik
028f1aece5 Adjust comment 2026-04-09 07:48:29 +02:00
Erik
a141fb195a Change for to optional 2026-04-09 07:46:26 +02:00
Erik
459f247620 Add duration to state based entity triggers 2026-04-09 07:36:06 +02:00
4 changed files with 638 additions and 23 deletions

View File

@@ -1,7 +1,8 @@
{
"common": {
"condition_behavior_name": "Condition passes if",
"trigger_behavior_name": "Trigger when"
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least"
},
"conditions": {
"is_detected": {
@@ -45,6 +46,9 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion cleared"
@@ -54,6 +58,9 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion detected"

View File

@@ -9,6 +9,11 @@
- first
- last
- any
for:
required: true
default: 00:00:00
selector:
duration:
detected:
fields: *trigger_common_fields

View File

@@ -7,6 +7,7 @@ import asyncio
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Mapping
from dataclasses import dataclass, field
from datetime import timedelta
import functools
import inspect
import logging
@@ -31,6 +32,7 @@ from homeassistant.const import (
CONF_ENABLED,
CONF_ENTITY_ID,
CONF_EVENT_DATA,
CONF_FOR,
CONF_ID,
CONF_OPTIONS,
CONF_PLATFORM,
@@ -74,6 +76,7 @@ from .automation import (
get_relative_description_key,
move_options_fields_to_top_level,
)
from .event import async_track_same_state
from .integration_platform import async_process_integration_platforms
from .selector import (
NumericThresholdMode,
@@ -340,6 +343,7 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
),
vol.Optional(CONF_FOR): cv.positive_time_period_dict,
},
}
)
@@ -365,6 +369,7 @@ class EntityTriggerBase(Trigger):
if TYPE_CHECKING:
assert config.target is not None
self._options = config.options or {}
self._duration: timedelta | None = self._options.get(CONF_FOR)
self._target = config.target
def entity_filter(self, entities: set[str]) -> set[str]:
@@ -394,15 +399,12 @@ class EntityTriggerBase(Trigger):
if (state := self._hass.states.get(entity_id)) is not None
)
def check_one_match(self, entity_ids: set[str]) -> bool:
"""Check that only one entity state matches."""
return (
sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
)
== 1
def count_matches(self, entity_ids: set[str]) -> int:
"""Count the number of entity states that match."""
return sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
)
@override
@@ -411,7 +413,8 @@ class EntityTriggerBase(Trigger):
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
behavior = self._options.get(ATTR_BEHAVIOR)
behavior: str = self._options.get(ATTR_BEHAVIOR, BEHAVIOR_ANY)
unsub_track_same: dict[str, Callable[[], None]] = {}
@callback
def state_change_listener(
@@ -423,6 +426,31 @@ class EntityTriggerBase(Trigger):
from_state = event.data["old_state"]
to_state = event.data["new_state"]
def state_still_valid(
_: str, from_state: State | None, to_state: State | None
) -> bool:
"""Check if the state is still valid during the duration wait.
Called by async_track_same_state on each state change to
determine whether to cancel the timer.
For behavior any, checks the individual entity's state.
For behavior first/last, checks the combined state.
"""
if not from_state or not to_state:
return False
if behavior == BEHAVIOR_LAST:
return self.check_all_match(
target_state_change_data.targeted_entity_ids
)
if behavior == BEHAVIOR_FIRST:
return (
self.count_matches(target_state_change_data.targeted_entity_ids)
>= 1
)
# Behavior any: check the individual entity's state
return self.is_valid_state(to_state)
if not from_state or not to_state:
return
@@ -440,25 +468,59 @@ class EntityTriggerBase(Trigger):
):
return
elif behavior == BEHAVIOR_FIRST:
if not self.check_one_match(
target_state_change_data.targeted_entity_ids
if (
self.count_matches(target_state_change_data.targeted_entity_ids)
!= 1
):
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"state of {entity_id}",
event.context,
@callback
def call_action() -> None:
"""Call action with right context."""
# After a `for` delay, keep the original triggering event payload.
# `async_track_same_state` only verifies the state remained valid
# for the configured duration before firing the action.
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"state of {entity_id}",
event.context,
)
if not self._duration:
call_action()
return
subscription_key = entity_id if behavior == BEHAVIOR_ANY else behavior
if subscription_key in unsub_track_same:
unsub_track_same.pop(subscription_key)()
unsub_track_same[subscription_key] = async_track_same_state(
self._hass,
self._duration,
call_action,
state_still_valid,
entity_ids=entity_id
if behavior == BEHAVIOR_ANY
else target_state_change_data.targeted_entity_ids,
)
return async_track_target_selector_state_change_event(
unsub = async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, self.entity_filter
)
@callback
def async_remove() -> None:
"""Remove state listeners async."""
unsub()
for async_remove in unsub_track_same.values():
async_remove()
unsub_track_same.clear()
return async_remove
class EntityTargetStateTriggerBase(EntityTriggerBase):
"""Trigger for entity state changes to a specific state.

View File

@@ -2,10 +2,13 @@
from collections.abc import Mapping
from contextlib import AbstractContextManager, nullcontext as does_not_raise
import datetime
import io
import logging
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from pytest_unordered import unordered
import voluptuous as vol
@@ -19,9 +22,12 @@ from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
CONF_ENTITY_ID,
CONF_FOR,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TARGET,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
UnitOfTemperature,
@@ -41,6 +47,10 @@ from homeassistant.helpers.automation import (
move_top_level_schema_fields_to_options,
)
from homeassistant.helpers.trigger import (
ATTR_BEHAVIOR,
BEHAVIOR_ANY,
BEHAVIOR_FIRST,
BEHAVIOR_LAST,
DATA_PLUGGABLE_ACTIONS,
TRIGGERS,
EntityNumericalStateChangedTriggerWithUnitBase,
@@ -65,7 +75,13 @@ from homeassistant.setup import async_setup_component
from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
from tests.common import (
MockModule,
MockPlatform,
async_fire_time_changed,
mock_integration,
mock_platform,
)
from tests.typing import WebSocketGenerator
@@ -3080,3 +3096,528 @@ async def test_make_entity_origin_state_trigger(
# To-state still matches from_state — not valid
assert not trig.is_valid_state(from_state)
class _OnOffTrigger(EntityTriggerBase):
"""Test trigger that fires when state becomes 'on'."""
_domain_specs = {"test": DomainSpec()}
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Valid if transitioning from a non-'on' state."""
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
return False
return from_state.state != STATE_ON
def is_valid_state(self, state: State) -> bool:
"""Valid if the state is 'on'."""
return state.state == STATE_ON
async def _arm_on_off_trigger(
hass: HomeAssistant,
entity_ids: list[str],
behavior: str,
calls: list[dict[str, Any]],
duration: dict[str, int] | None = None,
) -> CALLBACK_TYPE:
"""Set up _OnOffTrigger via async_initialize_triggers."""
async def async_get_triggers(
hass: HomeAssistant,
) -> dict[str, type[Trigger]]:
return {"on_off": _OnOffTrigger}
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
options: dict[str, Any] = {ATTR_BEHAVIOR: behavior}
if duration is not None:
options[CONF_FOR] = duration
trigger_config = {
CONF_PLATFORM: "test.on_off",
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
CONF_OPTIONS: options,
}
log = logging.getLogger(__name__)
@callback
def action(run_variables: dict[str, Any], context: Context | None = None) -> None:
calls.append(run_variables["trigger"])
validated_config = await async_validate_trigger_config(hass, [trigger_config])
return await async_initialize_triggers(
hass,
validated_config,
action,
domain="test",
name="test_on_off",
log_cb=log.log,
)
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_no_duration(hass: HomeAssistant, behavior: str) -> None:
"""Test EntityTriggerBase fires immediately without duration."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(hass, [entity_id], behavior, calls)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
# Transition back and trigger again
calls.clear()
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_with_duration(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test EntityTriggerBase waits for duration before firing."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on — should NOT fire immediately
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance time past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_duration_cancelled_on_state_change(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test that the duration timer is cancelled if state changes back."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on, then back off before duration expires
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_independent(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any tracks per-entity durations independently."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on 2 seconds later
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# After 5s from A's turn-on, A should fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_a
# After 5s from B's turn-on (2 more seconds), B should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_any_entity_off_cancels_only_that_entity(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning off one entity doesn't cancel the other's timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — cancels A's timer but not B's
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# After 5s total, B should fire but A should not
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_last_requires_all(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: trigger fires only when ALL entities are on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn only A on — should not start timer (not all match)
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on — now all match, timer starts
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_last_cancelled_when_one_turns_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer is cancelled when one entity turns off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_last_timer_reset(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer resets when combined state goes off and back on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on — combined state "all on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# After 2 seconds, B turns off — combined state breaks, timer cancelled
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# B turns back on — combined state restored, timer restarts
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# 4 seconds after restart (not enough) — should NOT fire
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 1 more second (5 total from restart) — should fire
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_fires_when_any_on(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: trigger fires when first entity turns on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on — combined state goes to "at least one on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_second(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: second entity turning on doesn't restart timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
# Turn B on 3 seconds later — combined state was already "any on",
# so this should NOT restart the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 total from A) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_partial_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: one entity off doesn't cancel if another is still on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — combined state still "any on" (B is on)
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past duration — should still fire (combined state never went to "none on")
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_cancelled_when_all_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: timer cancelled when ALL entities turn off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn both off after 2 seconds — combined state goes to "none on"
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_retrigger_resets_timer(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning an entity off and on resets its timer."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_on_off_trigger(
hass, [entity_id], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn on
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# After 3 seconds, turn off and on again — resets the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# 3 more seconds (6 from start, but only 3 from retrigger) — should NOT fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 from retrigger) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()