mirror of
https://github.com/home-assistant/core.git
synced 2025-07-19 11:17:21 +00:00
lock_reproduce_state (#27203)
This commit is contained in:
parent
0be1269b20
commit
cc4926afb1
61
homeassistant/components/lock/reproduce_state.py
Normal file
61
homeassistant/components/lock/reproduce_state.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
"""Reproduce an Lock state."""
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from typing import Iterable, Optional
|
||||||
|
|
||||||
|
from homeassistant.const import (
|
||||||
|
ATTR_ENTITY_ID,
|
||||||
|
STATE_LOCKED,
|
||||||
|
STATE_UNLOCKED,
|
||||||
|
SERVICE_LOCK,
|
||||||
|
SERVICE_UNLOCK,
|
||||||
|
)
|
||||||
|
from homeassistant.core import Context, State
|
||||||
|
from homeassistant.helpers.typing import HomeAssistantType
|
||||||
|
|
||||||
|
from . import DOMAIN
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
VALID_STATES = {STATE_LOCKED, STATE_UNLOCKED}
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_reproduce_state(
|
||||||
|
hass: HomeAssistantType, state: State, context: Optional[Context] = None
|
||||||
|
) -> None:
|
||||||
|
"""Reproduce a single state."""
|
||||||
|
cur_state = hass.states.get(state.entity_id)
|
||||||
|
|
||||||
|
if cur_state is None:
|
||||||
|
_LOGGER.warning("Unable to find entity %s", state.entity_id)
|
||||||
|
return
|
||||||
|
|
||||||
|
if state.state not in VALID_STATES:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Invalid state specified for %s: %s", state.entity_id, state.state
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Return if we are already at the right state.
|
||||||
|
if cur_state.state == state.state:
|
||||||
|
return
|
||||||
|
|
||||||
|
service_data = {ATTR_ENTITY_ID: state.entity_id}
|
||||||
|
|
||||||
|
if state.state == STATE_LOCKED:
|
||||||
|
service = SERVICE_LOCK
|
||||||
|
elif state.state == STATE_UNLOCKED:
|
||||||
|
service = SERVICE_UNLOCK
|
||||||
|
|
||||||
|
await hass.services.async_call(
|
||||||
|
DOMAIN, service, service_data, context=context, blocking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def async_reproduce_states(
|
||||||
|
hass: HomeAssistantType, states: Iterable[State], context: Optional[Context] = None
|
||||||
|
) -> None:
|
||||||
|
"""Reproduce Lock states."""
|
||||||
|
await asyncio.gather(
|
||||||
|
*(_async_reproduce_state(hass, state, context) for state in states)
|
||||||
|
)
|
53
tests/components/lock/test_reproduce_state.py
Normal file
53
tests/components/lock/test_reproduce_state.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
"""Test reproduce state for Lock."""
|
||||||
|
from homeassistant.core import State
|
||||||
|
|
||||||
|
from tests.common import async_mock_service
|
||||||
|
|
||||||
|
|
||||||
|
async def test_reproducing_states(hass, caplog):
|
||||||
|
"""Test reproducing Lock states."""
|
||||||
|
hass.states.async_set("lock.entity_locked", "locked", {})
|
||||||
|
hass.states.async_set("lock.entity_unlocked", "unlocked", {})
|
||||||
|
|
||||||
|
lock_calls = async_mock_service(hass, "lock", "lock")
|
||||||
|
unlock_calls = async_mock_service(hass, "lock", "unlock")
|
||||||
|
|
||||||
|
# These calls should do nothing as entities already in desired state
|
||||||
|
await hass.helpers.state.async_reproduce_state(
|
||||||
|
[
|
||||||
|
State("lock.entity_locked", "locked"),
|
||||||
|
State("lock.entity_unlocked", "unlocked", {}),
|
||||||
|
],
|
||||||
|
blocking=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(lock_calls) == 0
|
||||||
|
assert len(unlock_calls) == 0
|
||||||
|
|
||||||
|
# Test invalid state is handled
|
||||||
|
await hass.helpers.state.async_reproduce_state(
|
||||||
|
[State("lock.entity_locked", "not_supported")], blocking=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "not_supported" in caplog.text
|
||||||
|
assert len(lock_calls) == 0
|
||||||
|
assert len(unlock_calls) == 0
|
||||||
|
|
||||||
|
# Make sure correct services are called
|
||||||
|
await hass.helpers.state.async_reproduce_state(
|
||||||
|
[
|
||||||
|
State("lock.entity_locked", "unlocked"),
|
||||||
|
State("lock.entity_unlocked", "locked"),
|
||||||
|
# Should not raise
|
||||||
|
State("lock.non_existing", "on"),
|
||||||
|
],
|
||||||
|
blocking=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(lock_calls) == 1
|
||||||
|
assert lock_calls[0].domain == "lock"
|
||||||
|
assert lock_calls[0].data == {"entity_id": "lock.entity_unlocked"}
|
||||||
|
|
||||||
|
assert len(unlock_calls) == 1
|
||||||
|
assert unlock_calls[0].domain == "lock"
|
||||||
|
assert unlock_calls[0].data == {"entity_id": "lock.entity_locked"}
|
Loading…
x
Reference in New Issue
Block a user