diff --git a/homeassistant/components/homeassistant/triggers/state.py b/homeassistant/components/homeassistant/triggers/state.py index 91629cc9933..e6a4b90dbe8 100644 --- a/homeassistant/components/homeassistant/triggers/state.py +++ b/homeassistant/components/homeassistant/triggers/state.py @@ -36,6 +36,8 @@ _LOGGER = logging.getLogger(__name__) CONF_ENTITY_ID = "entity_id" CONF_FROM = "from" CONF_TO = "to" +CONF_NOT_FROM = "not_from" +CONF_NOT_TO = "not_to" BASE_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend( { @@ -49,15 +51,19 @@ BASE_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend( TRIGGER_STATE_SCHEMA = BASE_SCHEMA.extend( { # These are str on purpose. Want to catch YAML conversions - vol.Optional(CONF_FROM): vol.Any(str, [str], None), - vol.Optional(CONF_TO): vol.Any(str, [str], None), + vol.Exclusive(CONF_FROM, CONF_FROM): vol.Any(str, [str], None), + vol.Exclusive(CONF_NOT_FROM, CONF_FROM): vol.Any(str, [str], None), + vol.Exclusive(CONF_TO, CONF_TO): vol.Any(str, [str], None), + vol.Exclusive(CONF_NOT_TO, CONF_TO): vol.Any(str, [str], None), } ) TRIGGER_ATTRIBUTE_SCHEMA = BASE_SCHEMA.extend( { - vol.Optional(CONF_FROM): cv.match_all, - vol.Optional(CONF_TO): cv.match_all, + vol.Exclusive(CONF_FROM, CONF_FROM): cv.match_all, + vol.Exclusive(CONF_NOT_FROM, CONF_FROM): cv.match_all, + vol.Exclusive(CONF_TO, CONF_TO): cv.match_all, + vol.Exclusive(CONF_NOT_TO, CONF_TO): cv.match_all, } ) @@ -94,19 +100,30 @@ async def async_attach_trigger( ) -> CALLBACK_TYPE: """Listen for state changes based on configuration.""" entity_ids = config[CONF_ENTITY_ID] - if (from_state := config.get(CONF_FROM)) is None: - from_state = MATCH_ALL - if (to_state := config.get(CONF_TO)) is None: - to_state = MATCH_ALL + + if (from_state := config.get(CONF_FROM)) is not None: + match_from_state = process_state_match(from_state) + elif (not_from_state := config.get(CONF_NOT_FROM)) is not None: + match_from_state = process_state_match(not_from_state, invert=True) + else: + match_from_state = process_state_match(MATCH_ALL) + + if (to_state := config.get(CONF_TO)) is not None: + match_to_state = process_state_match(to_state) + elif (not_to_state := config.get(CONF_NOT_TO)) is not None: + match_to_state = process_state_match(not_to_state, invert=True) + else: + match_to_state = process_state_match(MATCH_ALL) + time_delta = config.get(CONF_FOR) template.attach(hass, time_delta) # If neither CONF_FROM or CONF_TO are specified, # fire on all changes to the state or an attribute - match_all = CONF_FROM not in config and CONF_TO not in config + match_all = all( + item not in config for item in (CONF_FROM, CONF_NOT_FROM, CONF_NOT_TO, CONF_TO) + ) unsub_track_same = {} period: dict[str, timedelta] = {} - match_from_state = process_state_match(from_state) - match_to_state = process_state_match(to_state) attribute = config.get(CONF_ATTRIBUTE) job = HassJob(action) diff --git a/homeassistant/helpers/event.py b/homeassistant/helpers/event.py index be855e19f4a..b85af09cfb9 100644 --- a/homeassistant/helpers/event.py +++ b/homeassistant/helpers/event.py @@ -1532,17 +1532,17 @@ track_time_change = threaded_listener_factory(async_track_time_change) def process_state_match( - parameter: None | str | Iterable[str], + parameter: None | str | Iterable[str], invert: bool = False ) -> Callable[[str | None], bool]: """Convert parameter to function that matches input against parameter.""" if parameter is None or parameter == MATCH_ALL: - return lambda _: True + return lambda _: not invert if isinstance(parameter, str) or not hasattr(parameter, "__iter__"): - return lambda state: state == parameter + return lambda state: invert is not (state == parameter) parameter_set = set(parameter) - return lambda state: state in parameter_set + return lambda state: invert is not (state in parameter_set) @callback diff --git a/tests/components/homeassistant/triggers/test_state.py b/tests/components/homeassistant/triggers/test_state.py index 026f096022b..44f55dcb4ac 100644 --- a/tests/components/homeassistant/triggers/test_state.py +++ b/tests/components/homeassistant/triggers/test_state.py @@ -7,7 +7,7 @@ import pytest import homeassistant.components.automation as automation from homeassistant.components.homeassistant.triggers import state as state_trigger from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF -from homeassistant.core import Context +from homeassistant.core import Context, HomeAssistant, ServiceCall from homeassistant.helpers import entity_registry as er from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util @@ -21,7 +21,7 @@ from tests.common import ( @pytest.fixture -def calls(hass): +def calls(hass: HomeAssistant) -> list[ServiceCall]: """Track calls to a mock service.""" return async_mock_service(hass, "test", "automation") @@ -164,6 +164,36 @@ async def test_if_fires_on_entity_change_with_from_filter(hass, calls): assert len(calls) == 1 +async def test_if_fires_on_entity_change_with_not_from_filter( + hass: HomeAssistant, calls: list[ServiceCall] +) -> None: + """Test for firing on entity change inverse filter.""" + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": { + "platform": "state", + "entity_id": "test.entity", + "not_from": "hello", + }, + "action": {"service": "test.automation"}, + } + }, + ) + await hass.async_block_till_done() + + # Do not fire from hello + hass.states.async_set("test.entity", "world") + await hass.async_block_till_done() + assert not calls + + hass.states.async_set("test.entity", "universum") + await hass.async_block_till_done() + assert len(calls) == 1 + + async def test_if_fires_on_entity_change_with_to_filter(hass, calls): """Test for firing on entity change with to filter.""" assert await async_setup_component( @@ -187,6 +217,36 @@ async def test_if_fires_on_entity_change_with_to_filter(hass, calls): assert len(calls) == 1 +async def test_if_fires_on_entity_change_with_not_to_filter( + hass: HomeAssistant, calls: list[ServiceCall] +) -> None: + """Test for firing on entity change with to filter.""" + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": { + "platform": "state", + "entity_id": "test.entity", + "not_to": "world", + }, + "action": {"service": "test.automation"}, + } + }, + ) + await hass.async_block_till_done() + + # Do not fire to world + hass.states.async_set("test.entity", "world") + await hass.async_block_till_done() + assert not calls + + hass.states.async_set("test.entity", "universum") + await hass.async_block_till_done() + assert len(calls) == 1 + + async def test_if_fires_on_entity_change_with_from_filter_all(hass, calls): """Test for firing on entity change with filter.""" assert await async_setup_component( @@ -283,6 +343,100 @@ async def test_if_fires_on_entity_change_with_both_filters(hass, calls): assert len(calls) == 1 +async def test_if_fires_on_entity_change_with_not_from_to( + hass: HomeAssistant, calls: list[ServiceCall] +) -> None: + """Test for firing if not from doesn't match and to match.""" + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": { + "platform": "state", + "entity_id": "test.entity", + "not_from": ["hello", "galaxy"], + "to": ["galaxy", "universe"], + }, + "action": {"service": "test.automation"}, + } + }, + ) + await hass.async_block_till_done() + + # We should not trigger from hello + hass.states.async_set("test.entity", "world") + await hass.async_block_till_done() + assert not calls + + # We should not trigger to != galaxy + hass.states.async_set("test.entity", "world") + await hass.async_block_till_done() + assert not calls + + # We should trigger to galaxy + hass.states.async_set("test.entity", "galaxy") + await hass.async_block_till_done() + assert len(calls) == 1 + + # We should not trigger from milky way + hass.states.async_set("test.entity", "milky_way") + await hass.async_block_till_done() + assert len(calls) == 1 + + # We should trigger to universe + hass.states.async_set("test.entity", "universe") + await hass.async_block_till_done() + assert len(calls) == 2 + + +async def test_if_fires_on_entity_change_with_from_not_to( + hass: HomeAssistant, calls: list[ServiceCall] +) -> None: + """Test for firing if not from doesn't match and to match.""" + assert await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": { + "platform": "state", + "entity_id": "test.entity", + "from": ["hello", "galaxy"], + "not_to": ["galaxy", "universe"], + }, + "action": {"service": "test.automation"}, + } + }, + ) + await hass.async_block_till_done() + + # We should trigger to world from hello + hass.states.async_set("test.entity", "world") + await hass.async_block_till_done() + assert len(calls) == 1 + + # Reset back to hello, should not trigger + hass.states.async_set("test.entity", "hello") + await hass.async_block_till_done() + assert len(calls) == 1 + + # We should not trigger to galaxy + hass.states.async_set("test.entity", "galaxy") + await hass.async_block_till_done() + assert len(calls) == 1 + + # We should trigger form galaxy to milky way + hass.states.async_set("test.entity", "milky_way") + await hass.async_block_till_done() + assert len(calls) == 2 + + # We should not trigger to universe + hass.states.async_set("test.entity", "universe") + await hass.async_block_till_done() + assert len(calls) == 2 + + async def test_if_not_fires_if_to_filter_not_match(hass, calls): """Test for not firing if to filter is not a match.""" assert await async_setup_component(