Add code_format_template to template locks (#106947)

* Add code_format to template locks

* Replace code_format with code_format_template

* Add test case for template eval to None

* Apply suggestion to not call super()

Co-authored-by: Erik Montnemery <erik@montnemery.com>

* Add more negative tests

* Handle template render errors

* Better error message

* Add custom test lock config for code format

* Add type hints from upstream

---------

Co-authored-by: Erik Montnemery <erik@montnemery.com>
Co-authored-by: Franck Nijhof <git@frenck.dev>
This commit is contained in:
chammp 2024-06-10 16:54:17 +02:00 committed by GitHub
parent c896458fcf
commit 52379ad7cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 342 additions and 7 deletions

View File

@ -14,6 +14,7 @@ from homeassistant.components.lock import (
LockEntity,
)
from homeassistant.const import (
ATTR_CODE,
CONF_NAME,
CONF_OPTIMISTIC,
CONF_UNIQUE_ID,
@ -23,7 +24,7 @@ from homeassistant.const import (
STATE_UNLOCKED,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import TemplateError
from homeassistant.exceptions import ServiceValidationError, TemplateError
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.script import Script
@ -36,6 +37,7 @@ from .template_entity import (
rewrite_common_legacy_to_modern_conf,
)
CONF_CODE_FORMAT_TEMPLATE = "code_format_template"
CONF_LOCK = "lock"
CONF_UNLOCK = "unlock"
@ -48,6 +50,7 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
vol.Required(CONF_LOCK): cv.SCRIPT_SCHEMA,
vol.Required(CONF_UNLOCK): cv.SCRIPT_SCHEMA,
vol.Required(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_CODE_FORMAT_TEMPLATE): cv.template,
vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
@ -90,6 +93,9 @@ class TemplateLock(TemplateEntity, LockEntity):
self._state_template = config.get(CONF_VALUE_TEMPLATE)
self._command_lock = Script(hass, config[CONF_LOCK], name, DOMAIN)
self._command_unlock = Script(hass, config[CONF_UNLOCK], name, DOMAIN)
self._code_format_template = config.get(CONF_CODE_FORMAT_TEMPLATE)
self._code_format = None
self._code_format_template_error = None
self._optimistic = config.get(CONF_OPTIMISTIC)
self._attr_assumed_state = bool(self._optimistic)
@ -115,6 +121,7 @@ class TemplateLock(TemplateEntity, LockEntity):
@callback
def _update_state(self, result):
"""Update the state from the template."""
super()._update_state(result)
if isinstance(result, TemplateError):
self._state = None
@ -130,24 +137,75 @@ class TemplateLock(TemplateEntity, LockEntity):
self._state = None
@property
def code_format(self) -> str | None:
"""Regex for code format or None if no code is required."""
return self._code_format
@callback
def _async_setup_templates(self) -> None:
"""Set up templates."""
self.add_template_attribute(
"_state", self._state_template, None, self._update_state
)
if self._code_format_template:
self.add_template_attribute(
"_code_format_template",
self._code_format_template,
None,
self._update_code_format,
)
super()._async_setup_templates()
@callback
def _update_code_format(self, render: str | TemplateError | None):
"""Update code format from the template."""
if isinstance(render, TemplateError):
self._code_format = None
self._code_format_template_error = render
elif render in (None, "None", ""):
self._code_format = None
self._code_format_template_error = None
else:
self._code_format = render
self._code_format_template_error = None
async def async_lock(self, **kwargs: Any) -> None:
"""Lock the device."""
self._raise_template_error_if_available()
if self._optimistic:
self._state = True
self.async_write_ha_state()
await self.async_run_script(self._command_lock, context=self._context)
tpl_vars = {ATTR_CODE: kwargs.get(ATTR_CODE) if kwargs else None}
await self.async_run_script(
self._command_lock, run_variables=tpl_vars, context=self._context
)
async def async_unlock(self, **kwargs: Any) -> None:
"""Unlock the device."""
self._raise_template_error_if_available()
if self._optimistic:
self._state = False
self.async_write_ha_state()
await self.async_run_script(self._command_unlock, context=self._context)
tpl_vars = {ATTR_CODE: kwargs.get(ATTR_CODE) if kwargs else None}
await self.async_run_script(
self._command_unlock, run_variables=tpl_vars, context=self._context
)
def _raise_template_error_if_available(self):
if self._code_format_template_error is not None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="code_format_template_error",
translation_placeholders={
"entity_id": self.entity_id,
"code_format_template": self._code_format_template.template,
"cause": str(self._code_format_template_error),
},
)

View File

@ -153,5 +153,10 @@
"name": "[%key:common::action::reload%]",
"description": "Reloads template entities from the YAML-configuration."
}
},
"exceptions": {
"code_format_template_error": {
"message": "Error evaluating code format template \"{code_format_template}\" for {entity_id}: {cause}"
}
}
}

View File

@ -4,7 +4,13 @@ import pytest
from homeassistant import setup
from homeassistant.components import lock
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from homeassistant.const import (
ATTR_CODE,
ATTR_ENTITY_ID,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.core import HomeAssistant, ServiceCall
OPTIMISTIC_LOCK_CONFIG = {
@ -25,6 +31,26 @@ OPTIMISTIC_LOCK_CONFIG = {
},
}
OPTIMISTIC_CODED_LOCK_CONFIG = {
"platform": "template",
"lock": {
"service": "test.automation",
"data_template": {
"action": "lock",
"caller": "{{ this.entity_id }}",
"code": "{{ code }}",
},
},
"unlock": {
"service": "test.automation",
"data_template": {
"action": "unlock",
"caller": "{{ this.entity_id }}",
"code": "{{ code }}",
},
},
}
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
@ -138,10 +164,24 @@ async def test_template_state_boolean_off(hass: HomeAssistant, start_ha) -> None
},
}
},
{
lock.DOMAIN: {
**OPTIMISTIC_LOCK_CONFIG,
"value_template": "{{ 1 == 1 }}",
"code_format_template": "{{ rubbish }",
}
},
{
lock.DOMAIN: {
**OPTIMISTIC_LOCK_CONFIG,
"value_template": "{{ 1 == 1 }}",
"code_format_template": "{% if rubbish %}",
}
},
],
)
async def test_template_syntax_error(hass: HomeAssistant, start_ha) -> None:
"""Test templating syntax error."""
"""Test templating syntax errors don't create entities."""
assert hass.states.async_all("lock") == []
@ -192,7 +232,9 @@ async def test_lock_action(
assert state.state == lock.STATE_UNLOCKED
await hass.services.async_call(
lock.DOMAIN, lock.SERVICE_LOCK, {ATTR_ENTITY_ID: "lock.template_lock"}
lock.DOMAIN,
lock.SERVICE_LOCK,
{ATTR_ENTITY_ID: "lock.template_lock"},
)
await hass.async_block_till_done()
@ -225,7 +267,9 @@ async def test_unlock_action(
assert state.state == lock.STATE_LOCKED
await hass.services.async_call(
lock.DOMAIN, lock.SERVICE_UNLOCK, {ATTR_ENTITY_ID: "lock.template_lock"}
lock.DOMAIN,
lock.SERVICE_UNLOCK,
{ATTR_ENTITY_ID: "lock.template_lock"},
)
await hass.async_block_till_done()
@ -234,6 +278,234 @@ async def test_unlock_action(
assert calls[0].data["caller"] == "lock.template_lock"
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_CODED_LOCK_CONFIG,
"value_template": "{{ states.switch.test_state.state }}",
"code_format_template": "{{ '.+' }}",
}
},
],
)
async def test_lock_action_with_code(
hass: HomeAssistant, start_ha, calls: list[ServiceCall]
) -> None:
"""Test lock action with defined code format and supplied lock code."""
await setup.async_setup_component(hass, "switch", {})
hass.states.async_set("switch.test_state", STATE_OFF)
await hass.async_block_till_done()
state = hass.states.get("lock.template_lock")
assert state.state == lock.STATE_UNLOCKED
await hass.services.async_call(
lock.DOMAIN,
lock.SERVICE_LOCK,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "LOCK_CODE"},
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["action"] == "lock"
assert calls[0].data["caller"] == "lock.template_lock"
assert calls[0].data["code"] == "LOCK_CODE"
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_CODED_LOCK_CONFIG,
"value_template": "{{ states.switch.test_state.state }}",
"code_format_template": "{{ '.+' }}",
}
},
],
)
async def test_unlock_action_with_code(
hass: HomeAssistant, start_ha, calls: list[ServiceCall]
) -> None:
"""Test unlock action with code format and supplied unlock code."""
await setup.async_setup_component(hass, "switch", {})
hass.states.async_set("switch.test_state", STATE_ON)
await hass.async_block_till_done()
state = hass.states.get("lock.template_lock")
assert state.state == lock.STATE_LOCKED
await hass.services.async_call(
lock.DOMAIN,
lock.SERVICE_UNLOCK,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "UNLOCK_CODE"},
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["action"] == "unlock"
assert calls[0].data["caller"] == "lock.template_lock"
assert calls[0].data["code"] == "UNLOCK_CODE"
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_LOCK_CONFIG,
"value_template": "{{ 1 == 1 }}",
"code_format_template": "{{ '\\\\d+' }}",
}
},
],
)
@pytest.mark.parametrize(
"test_action",
[
lock.SERVICE_LOCK,
lock.SERVICE_UNLOCK,
],
)
async def test_lock_actions_fail_with_invalid_code(
hass: HomeAssistant, start_ha, calls: list[ServiceCall], test_action
) -> None:
"""Test invalid lock codes."""
await hass.services.async_call(
lock.DOMAIN,
test_action,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "non-number-value"},
)
await hass.services.async_call(
lock.DOMAIN,
test_action,
{ATTR_ENTITY_ID: "lock.template_lock"},
)
await hass.async_block_till_done()
assert len(calls) == 0
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_LOCK_CONFIG,
"value_template": "{{ 1 == 1 }}",
"code_format_template": "{{ 1/0 }}",
}
},
],
)
async def test_lock_actions_dont_execute_with_code_template_rendering_error(
hass: HomeAssistant, start_ha, calls: list[ServiceCall]
) -> None:
"""Test lock code format rendering fails block lock/unlock actions."""
await hass.services.async_call(
lock.DOMAIN,
lock.SERVICE_LOCK,
{ATTR_ENTITY_ID: "lock.template_lock"},
)
await hass.services.async_call(
lock.DOMAIN,
lock.SERVICE_UNLOCK,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "any-value"},
)
await hass.async_block_till_done()
assert len(calls) == 0
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize("action", [lock.SERVICE_LOCK, lock.SERVICE_UNLOCK])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_CODED_LOCK_CONFIG,
"value_template": "{{ states.switch.test_state.state }}",
"code_format_template": "{{ None }}",
}
},
],
)
async def test_actions_with_none_as_codeformat_ignores_code(
hass: HomeAssistant, action, start_ha, calls: list[ServiceCall]
) -> None:
"""Test lock actions with supplied lock code."""
await setup.async_setup_component(hass, "switch", {})
hass.states.async_set("switch.test_state", STATE_OFF)
await hass.async_block_till_done()
state = hass.states.get("lock.template_lock")
assert state.state == lock.STATE_UNLOCKED
await hass.services.async_call(
lock.DOMAIN,
action,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "any code"},
)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["action"] == action
assert calls[0].data["caller"] == "lock.template_lock"
assert calls[0].data["code"] == "any code"
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize("action", [lock.SERVICE_LOCK, lock.SERVICE_UNLOCK])
@pytest.mark.parametrize(
"config",
[
{
lock.DOMAIN: {
**OPTIMISTIC_LOCK_CONFIG,
"value_template": "{{ states.switch.test_state.state }}",
"code_format_template": "[12]{1",
}
},
],
)
async def test_actions_with_invalid_regexp_as_codeformat_never_execute(
hass: HomeAssistant, action, start_ha, calls: list[ServiceCall]
) -> None:
"""Test lock actions don't execute with invalid regexp."""
await setup.async_setup_component(hass, "switch", {})
hass.states.async_set("switch.test_state", STATE_OFF)
await hass.async_block_till_done()
state = hass.states.get("lock.template_lock")
assert state.state == lock.STATE_UNLOCKED
await hass.services.async_call(
lock.DOMAIN,
action,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "1"},
)
await hass.services.async_call(
lock.DOMAIN,
action,
{ATTR_ENTITY_ID: "lock.template_lock", ATTR_CODE: "x"},
)
await hass.services.async_call(
lock.DOMAIN,
action,
{ATTR_ENTITY_ID: "lock.template_lock"},
)
await hass.async_block_till_done()
assert len(calls) == 0
@pytest.mark.parametrize(("count", "domain"), [(1, lock.DOMAIN)])
@pytest.mark.parametrize(
"config",