Refactor lock default code handling (#104807)

This commit is contained in:
Jan Bouwhuis 2023-12-04 14:59:51 +01:00 committed by GitHub
parent 13fdac23c1
commit 7d21ed41a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 88 deletions

View File

@ -24,7 +24,7 @@ from homeassistant.const import (
STATE_UNLOCKED,
STATE_UNLOCKING,
)
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.config_validation import ( # noqa: F401
PLATFORM_SCHEMA,
@ -33,7 +33,6 @@ from homeassistant.helpers.config_validation import ( # noqa: F401
)
from homeassistant.helpers.entity import Entity, EntityDescription
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.helpers.service import remove_entity_service_fields
from homeassistant.helpers.typing import ConfigType, StateType
_LOGGER = logging.getLogger(__name__)
@ -75,48 +74,21 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await component.async_setup(config)
component.async_register_entity_service(
SERVICE_UNLOCK, LOCK_SERVICE_SCHEMA, _async_unlock
SERVICE_UNLOCK, LOCK_SERVICE_SCHEMA, "async_handle_unlock_service"
)
component.async_register_entity_service(
SERVICE_LOCK, LOCK_SERVICE_SCHEMA, _async_lock
SERVICE_LOCK, LOCK_SERVICE_SCHEMA, "async_handle_lock_service"
)
component.async_register_entity_service(
SERVICE_OPEN, LOCK_SERVICE_SCHEMA, _async_open, [LockEntityFeature.OPEN]
SERVICE_OPEN,
LOCK_SERVICE_SCHEMA,
"async_handle_open_service",
[LockEntityFeature.OPEN],
)
return True
@callback
def _add_default_code(entity: LockEntity, service_call: ServiceCall) -> dict[Any, Any]:
data = remove_entity_service_fields(service_call)
code: str = data.pop(ATTR_CODE, "")
if not code:
code = entity._lock_option_default_code # pylint: disable=protected-access
if entity.code_format_cmp and not entity.code_format_cmp.match(code):
raise ValueError(
f"Code '{code}' for locking {entity.entity_id} doesn't match pattern {entity.code_format}"
)
if code:
data[ATTR_CODE] = code
return data
async def _async_lock(entity: LockEntity, service_call: ServiceCall) -> None:
"""Lock the lock."""
await entity.async_lock(**_add_default_code(entity, service_call))
async def _async_unlock(entity: LockEntity, service_call: ServiceCall) -> None:
"""Unlock the lock."""
await entity.async_unlock(**_add_default_code(entity, service_call))
async def _async_open(entity: LockEntity, service_call: ServiceCall) -> None:
"""Open the door latch."""
await entity.async_open(**_add_default_code(entity, service_call))
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a config entry."""
component: EntityComponent[LockEntity] = hass.data[DOMAIN]
@ -149,6 +121,21 @@ class LockEntity(Entity):
_lock_option_default_code: str = ""
__code_format_cmp: re.Pattern[str] | None = None
@final
@callback
def add_default_code(self, data: dict[Any, Any]) -> dict[Any, Any]:
"""Add default lock code."""
code: str = data.pop(ATTR_CODE, "")
if not code:
code = self._lock_option_default_code
if self.code_format_cmp and not self.code_format_cmp.match(code):
raise ValueError(
f"Code '{code}' for locking {self.entity_id} doesn't match pattern {self.code_format}"
)
if code:
data[ATTR_CODE] = code
return data
@property
def changed_by(self) -> str | None:
"""Last change triggered by."""
@ -193,6 +180,11 @@ class LockEntity(Entity):
"""Return true if the lock is jammed (incomplete locking)."""
return self._attr_is_jammed
@final
async def async_handle_lock_service(self, **kwargs: Any) -> None:
"""Add default code and lock."""
await self.async_lock(**self.add_default_code(kwargs))
def lock(self, **kwargs: Any) -> None:
"""Lock the lock."""
raise NotImplementedError()
@ -201,6 +193,11 @@ class LockEntity(Entity):
"""Lock the lock."""
await self.hass.async_add_executor_job(ft.partial(self.lock, **kwargs))
@final
async def async_handle_unlock_service(self, **kwargs: Any) -> None:
"""Add default code and unlock."""
await self.async_unlock(**self.add_default_code(kwargs))
def unlock(self, **kwargs: Any) -> None:
"""Unlock the lock."""
raise NotImplementedError()
@ -209,6 +206,11 @@ class LockEntity(Entity):
"""Unlock the lock."""
await self.hass.async_add_executor_job(ft.partial(self.unlock, **kwargs))
@final
async def async_handle_open_service(self, **kwargs: Any) -> None:
"""Add default code and open."""
await self.async_open(**self.add_default_code(kwargs))
def open(self, **kwargs: Any) -> None:
"""Open the door latch."""
raise NotImplementedError()

View File

@ -9,10 +9,6 @@ import pytest
from homeassistant.components.lock import (
ATTR_CODE,
CONF_DEFAULT_CODE,
DOMAIN,
SERVICE_LOCK,
SERVICE_OPEN,
SERVICE_UNLOCK,
STATE_JAMMED,
STATE_LOCKED,
STATE_LOCKING,
@ -20,11 +16,8 @@ from homeassistant.components.lock import (
STATE_UNLOCKING,
LockEntity,
LockEntityFeature,
_async_lock,
_async_open,
_async_unlock,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.core import HomeAssistant
import homeassistant.helpers.entity_registry as er
from homeassistant.setup import async_setup_component
@ -87,7 +80,7 @@ async def test_lock_states(hass: HomeAssistant) -> None:
assert lock.is_locking
assert lock.state == STATE_LOCKING
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {}))
await lock.async_handle_lock_service()
assert lock.is_locked
assert lock.state == STATE_LOCKED
@ -95,7 +88,7 @@ async def test_lock_states(hass: HomeAssistant) -> None:
assert lock.is_unlocking
assert lock.state == STATE_UNLOCKING
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {}))
await lock.async_handle_unlock_service()
assert not lock.is_locked
assert lock.state == STATE_UNLOCKED
@ -189,12 +182,12 @@ async def test_lock_open_with_code(hass: HomeAssistant) -> None:
assert lock.state_attributes == {"code_format": r"^\d{4}$"}
with pytest.raises(ValueError):
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {}))
await lock.async_handle_open_service()
with pytest.raises(ValueError):
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: ""}))
await lock.async_handle_open_service(code="")
with pytest.raises(ValueError):
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: "HELLO"}))
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: "1234"}))
await lock.async_handle_open_service(code="HELLO")
await lock.async_handle_open_service(code="1234")
assert lock.calls_open.call_count == 1
@ -203,16 +196,16 @@ async def test_lock_lock_with_code(hass: HomeAssistant) -> None:
lock = MockLockEntity(code_format=r"^\d{4}$")
lock.hass = hass
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "1234"}))
await lock.async_handle_unlock_service(code="1234")
assert not lock.is_locked
with pytest.raises(ValueError):
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {}))
await lock.async_handle_lock_service()
with pytest.raises(ValueError):
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: ""}))
await lock.async_handle_lock_service(code="")
with pytest.raises(ValueError):
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: "HELLO"}))
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: "1234"}))
await lock.async_handle_lock_service(code="HELLO")
await lock.async_handle_lock_service(code="1234")
assert lock.is_locked
@ -221,18 +214,16 @@ async def test_lock_unlock_with_code(hass: HomeAssistant) -> None:
lock = MockLockEntity(code_format=r"^\d{4}$")
lock.hass = hass
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "1234"}))
await lock.async_handle_lock_service(code="1234")
assert lock.is_locked
with pytest.raises(ValueError):
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {}))
await lock.async_handle_unlock_service()
with pytest.raises(ValueError):
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: ""}))
await lock.async_handle_unlock_service(code="")
with pytest.raises(ValueError):
await _async_unlock(
lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "HELLO"})
)
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "1234"}))
await lock.async_handle_unlock_service(code="HELLO")
await lock.async_handle_unlock_service(code="1234")
assert not lock.is_locked
@ -245,17 +236,11 @@ async def test_lock_with_illegal_code(hass: HomeAssistant) -> None:
lock.hass = hass
with pytest.raises(ValueError):
await _async_open(
lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: "123456"})
)
await lock.async_handle_open_service(code="123456")
with pytest.raises(ValueError):
await _async_lock(
lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: "123456"})
)
await lock.async_handle_lock_service(code="123456")
with pytest.raises(ValueError):
await _async_unlock(
lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "123456"})
)
await lock.async_handle_unlock_service(code="123456")
async def test_lock_with_no_code(hass: HomeAssistant) -> None:
@ -265,18 +250,18 @@ async def test_lock_with_no_code(hass: HomeAssistant) -> None:
)
lock.hass = hass
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {}))
await lock.async_handle_open_service()
lock.calls_open.assert_called_with({})
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {}))
await lock.async_handle_lock_service()
lock.calls_lock.assert_called_with({})
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {}))
await lock.async_handle_unlock_service()
lock.calls_unlock.assert_called_with({})
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: ""}))
await lock.async_handle_open_service(code="")
lock.calls_open.assert_called_with({})
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: ""}))
await lock.async_handle_lock_service(code="")
lock.calls_lock.assert_called_with({})
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: ""}))
await lock.async_handle_unlock_service(code="")
lock.calls_unlock.assert_called_with({})
@ -292,18 +277,18 @@ async def test_lock_with_default_code(hass: HomeAssistant) -> None:
assert lock.state_attributes == {"code_format": r"^\d{4}$"}
assert lock._lock_option_default_code == "1234"
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {}))
await lock.async_handle_open_service()
lock.calls_open.assert_called_with({ATTR_CODE: "1234"})
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {}))
await lock.async_handle_lock_service()
lock.calls_lock.assert_called_with({ATTR_CODE: "1234"})
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {}))
await lock.async_handle_unlock_service()
lock.calls_unlock.assert_called_with({ATTR_CODE: "1234"})
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: ""}))
await lock.async_handle_open_service(code="")
lock.calls_open.assert_called_with({ATTR_CODE: "1234"})
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: ""}))
await lock.async_handle_lock_service(code="")
lock.calls_lock.assert_called_with({ATTR_CODE: "1234"})
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: ""}))
await lock.async_handle_unlock_service(code="")
lock.calls_unlock.assert_called_with({ATTR_CODE: "1234"})
@ -316,11 +301,11 @@ async def test_lock_with_provided_and_default_code(hass: HomeAssistant) -> None:
)
lock.hass = hass
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {ATTR_CODE: "4321"}))
await lock.async_handle_open_service(code="4321")
lock.calls_open.assert_called_with({ATTR_CODE: "4321"})
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {ATTR_CODE: "4321"}))
await lock.async_handle_lock_service(code="4321")
lock.calls_lock.assert_called_with({ATTR_CODE: "4321"})
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {ATTR_CODE: "4321"}))
await lock.async_handle_unlock_service(code="4321")
lock.calls_unlock.assert_called_with({ATTR_CODE: "4321"})
@ -337,8 +322,8 @@ async def test_lock_with_illegal_default_code(hass: HomeAssistant) -> None:
assert lock._lock_option_default_code == "123456"
with pytest.raises(ValueError):
await _async_open(lock, ServiceCall(DOMAIN, SERVICE_OPEN, {}))
await lock.async_handle_open_service()
with pytest.raises(ValueError):
await _async_lock(lock, ServiceCall(DOMAIN, SERVICE_LOCK, {}))
await lock.async_handle_lock_service()
with pytest.raises(ValueError):
await _async_unlock(lock, ServiceCall(DOMAIN, SERVICE_UNLOCK, {}))
await lock.async_handle_unlock_service()