diff --git a/homeassistant/components/input_boolean/reproduce_state.py b/homeassistant/components/input_boolean/reproduce_state.py new file mode 100644 index 00000000000..b8bc18edfac --- /dev/null +++ b/homeassistant/components/input_boolean/reproduce_state.py @@ -0,0 +1,57 @@ +"""Reproduce an input boolean state.""" +import asyncio +import logging +from typing import Iterable, Optional + +from homeassistant.const import ( + SERVICE_TURN_OFF, + SERVICE_TURN_ON, + STATE_ON, + STATE_OFF, + ATTR_ENTITY_ID, +) +from homeassistant.core import Context, State +from homeassistant.helpers.typing import HomeAssistantType + +from . import DOMAIN + +_LOGGER = logging.getLogger(__name__) + + +async def _async_reproduce_states( + hass: HomeAssistantType, state: State, context: Optional[Context] = None +) -> None: + """Reproduce input boolean states.""" + cur_state = hass.states.get(state.entity_id) + + if cur_state is None: + _LOGGER.warning("Unable to find entity %s", state.entity_id) + return + + if state.state not in (STATE_ON, STATE_OFF): + _LOGGER.warning( + "Invalid state specified for %s: %s", state.entity_id, state.state + ) + return + + if cur_state.state == state.state: + return + + service = SERVICE_TURN_ON if state.state == STATE_ON else SERVICE_TURN_OFF + + await hass.services.async_call( + DOMAIN, + service, + {ATTR_ENTITY_ID: state.entity_id}, + context=context, + blocking=True, + ) + + +async def async_reproduce_states( + hass: HomeAssistantType, states: Iterable[State], context: Optional[Context] = None +) -> None: + """Reproduce component states.""" + await asyncio.gather( + *(_async_reproduce_states(hass, state, context) for state in states) + ) diff --git a/tests/components/input_boolean/test_reproduce_state.py b/tests/components/input_boolean/test_reproduce_state.py new file mode 100644 index 00000000000..7ce4f4c1fd1 --- /dev/null +++ b/tests/components/input_boolean/test_reproduce_state.py @@ -0,0 +1,41 @@ +"""Test reproduce state for input boolean.""" +from homeassistant.core import State +from homeassistant.setup import async_setup_component + + +async def test_reproducing_states(hass): + """Test reproducing input_boolean states.""" + assert await async_setup_component( + hass, + "input_boolean", + { + "input_boolean": { + "initial_on": {"initial": True}, + "initial_off": {"initial": False}, + } + }, + ) + await hass.helpers.state.async_reproduce_state( + [ + State("input_boolean.initial_on", "off"), + State("input_boolean.initial_off", "on"), + # Should not raise + State("input_boolean.non_existing", "on"), + ], + blocking=True, + ) + assert hass.states.get("input_boolean.initial_off").state == "on" + assert hass.states.get("input_boolean.initial_on").state == "off" + + await hass.helpers.state.async_reproduce_state( + [ + # Test invalid state + State("input_boolean.initial_on", "invalid_state"), + # Set to state it already is. + State("input_boolean.initial_off", "on"), + ], + blocking=True, + ) + + assert hass.states.get("input_boolean.initial_on").state == "off" + assert hass.states.get("input_boolean.initial_off").state == "on"