Add trigger based entities to template switch (#141763)

* Add trigger based entities to template switch platform

* add suggestions
This commit is contained in:
Petro31 2025-04-29 03:40:16 -04:00 committed by GitHub
parent 6423957d29
commit b2fcab20a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 356 additions and 20 deletions

View File

@ -124,7 +124,7 @@ CONFIG_SECTION_SCHEMA = vol.Schema(
),
},
ensure_domains_do_not_have_trigger_or_action(
BUTTON_DOMAIN, COVER_DOMAIN, LIGHT_DOMAIN, SWITCH_DOMAIN
BUTTON_DOMAIN, COVER_DOMAIN, LIGHT_DOMAIN
),
)
)

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any
import voluptuous as vol
from homeassistant.components.switch import (
DOMAIN as SWITCH_DOMAIN,
ENTITY_ID_FORMAT,
PLATFORM_SCHEMA as SWITCH_PLATFORM_SCHEMA,
SwitchEntity,
@ -23,6 +24,8 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import TemplateError
@ -36,6 +39,7 @@ from homeassistant.helpers.entity_platform import (
from homeassistant.helpers.restore_state import RestoreEntity
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import TriggerUpdateCoordinator
from .const import CONF_OBJECT_ID, CONF_PICTURE, CONF_TURN_OFF, CONF_TURN_ON, DOMAIN
from .template_entity import (
LEGACY_FIELDS as TEMPLATE_ENTITY_LEGACY_FIELDS,
@ -45,6 +49,7 @@ from .template_entity import (
TemplateEntity,
rewrite_common_legacy_to_modern_conf,
)
from .trigger_entity import TriggerEntity
_VALID_STATES = [STATE_ON, STATE_OFF, "true", "false"]
@ -173,6 +178,13 @@ async def async_setup_platform(
)
return
if "coordinator" in discovery_info:
async_add_entities(
TriggerSwitchEntity(hass, discovery_info["coordinator"], config)
for config in discovery_info["entities"]
)
return
_async_create_template_tracking_entities(
async_add_entities,
hass,
@ -295,3 +307,83 @@ class SwitchTemplate(TemplateEntity, SwitchEntity, RestoreEntity):
if self._template is None:
self._state = False
self.async_write_ha_state()
class TriggerSwitchEntity(TriggerEntity, SwitchEntity, RestoreEntity):
"""Switch entity based on trigger data."""
domain = SWITCH_DOMAIN
def __init__(
self,
hass: HomeAssistant,
coordinator: TriggerUpdateCoordinator,
config: ConfigType,
) -> None:
"""Initialize the entity."""
super().__init__(hass, coordinator, config)
name = self._rendered.get(CONF_NAME, DEFAULT_NAME)
self._template = config.get(CONF_STATE)
if on_action := config.get(CONF_TURN_ON):
self.add_script(CONF_TURN_ON, on_action, name, DOMAIN)
if off_action := config.get(CONF_TURN_OFF):
self.add_script(CONF_TURN_OFF, off_action, name, DOMAIN)
self._attr_assumed_state = self._template is None
if not self._attr_assumed_state:
self._to_render_simple.append(CONF_STATE)
self._parse_result.add(CONF_STATE)
self._attr_device_info = async_device_info_to_link_from_device_id(
hass,
config.get(CONF_DEVICE_ID),
)
async def async_added_to_hass(self) -> None:
"""Restore last state."""
await super().async_added_to_hass()
if (
(last_state := await self.async_get_last_state()) is not None
and last_state.state not in (STATE_UNKNOWN, STATE_UNAVAILABLE)
# The trigger might have fired already while we waited for stored data,
# then we should not restore state
and self.is_on is None
):
self._attr_is_on = last_state.state == STATE_ON
self.restore_attributes(last_state)
@callback
def _handle_coordinator_update(self) -> None:
"""Handle update of the data."""
self._process_data()
if not self.available:
self.async_write_ha_state()
return
if not self._attr_assumed_state:
raw = self._rendered.get(CONF_STATE)
self._attr_is_on = template.result_as_boolean(raw)
self.async_set_context(self.coordinator.data["context"])
self.async_write_ha_state()
elif self._attr_assumed_state and len(self._rendered) > 0:
# In case name, icon, or friendly name have a template but
# states does not
self.async_write_ha_state()
async def async_turn_on(self, **kwargs: Any) -> None:
"""Fire the on action."""
if on_script := self._action_scripts.get(CONF_TURN_ON):
await self.async_run_script(on_script, context=self._context)
if self._template is None:
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Fire the off action."""
if off_script := self._action_scripts.get(CONF_TURN_OFF):
await self.async_run_script(off_script, context=self._context)
if self._template is None:
self._attr_is_on = False
self.async_write_ha_state()

View File

@ -65,7 +65,9 @@ class TriggerEntity( # pylint: disable=hass-enforce-class-module
"""Render configured variables."""
if self.coordinator.data is None:
return {}
return self.coordinator.data["run_variables"] or {}
if self.coordinator.data is None:
return {}
return self.coordinator.data["run_variables"] or {} or {}
def _render_templates(self, variables: dict[str, Any]) -> None:
"""Render templates."""

View File

@ -16,6 +16,7 @@ class ConfigurationStyle(Enum):
LEGACY = "Legacy"
MODERN = "Modern"
TRIGGER = "Trigger"
@pytest.fixture

View File

@ -37,6 +37,12 @@ TEST_OBJECT_ID = "test_template_switch"
TEST_ENTITY_ID = f"switch.{TEST_OBJECT_ID}"
TEST_STATE_ENTITY_ID = "switch.test_state"
TEST_EVENT_TRIGGER = {
"trigger": {"platform": "event", "event_type": "test_event"},
"variables": {"type": "{{ trigger.event.data.type }}"},
"action": [{"event": "action_event", "event_data": {"type": "{{ type }}"}}],
}
SWITCH_TURN_ON = {
"service": "test.automation",
"data_template": {
@ -100,6 +106,33 @@ async def async_setup_modern_format(
await hass.async_block_till_done()
async def async_setup_trigger_format(
hass: HomeAssistant, count: int, switch_config: dict[str, Any]
) -> None:
"""Do setup of switch integration via modern format."""
config = {"template": {**TEST_EVENT_TRIGGER, "switch": switch_config}}
with assert_setup_component(count, template.DOMAIN):
assert await async_setup_component(
hass,
template.DOMAIN,
config,
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
async def async_ensure_triggered_entity_updates(
hass: HomeAssistant, style: ConfigurationStyle, **kwargs
) -> None:
"""Trigger template entities."""
if style == ConfigurationStyle.TRIGGER:
hass.bus.async_fire("test_event", {"type": "test_event", **kwargs})
await hass.async_block_till_done()
@pytest.fixture
async def setup_switch(
hass: HomeAssistant,
@ -112,6 +145,8 @@ async def setup_switch(
await async_setup_legacy_format(hass, count, switch_config)
elif style == ConfigurationStyle.MODERN:
await async_setup_modern_format(hass, count, switch_config)
elif style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(hass, count, switch_config)
@pytest.fixture
@ -142,6 +177,15 @@ async def setup_state_switch(
"state": state_template,
},
)
elif style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass,
count,
{
**NAMED_SWITCH_ACTIONS,
"state": state_template,
},
)
@pytest.fixture
@ -176,6 +220,16 @@ async def setup_single_attribute_switch(
**extra,
},
)
elif style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass,
count,
{
**NAMED_SWITCH_ACTIONS,
"state": "{{ 1 == 1 }}",
**extra,
},
)
@pytest.fixture
@ -203,6 +257,55 @@ async def setup_optimistic_switch(
**NAMED_SWITCH_ACTIONS,
},
)
elif style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass,
count,
{
**NAMED_SWITCH_ACTIONS,
},
)
@pytest.fixture
async def setup_single_attribute_optimistic_switch(
hass: HomeAssistant,
count: int,
style: ConfigurationStyle,
attribute: str,
attribute_template: str,
) -> None:
"""Do setup of switch integration testing a single attribute."""
extra = {attribute: attribute_template} if attribute and attribute_template else {}
if style == ConfigurationStyle.LEGACY:
await async_setup_legacy_format(
hass,
count,
{
TEST_OBJECT_ID: {
**SWITCH_ACTIONS,
**extra,
}
},
)
elif style == ConfigurationStyle.MODERN:
await async_setup_modern_format(
hass,
count,
{
**NAMED_SWITCH_ACTIONS,
**extra,
},
)
elif style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass,
count,
{
**NAMED_SWITCH_ACTIONS,
**extra,
},
)
async def test_legacy_to_modern_config(hass: HomeAssistant) -> None:
@ -238,10 +341,14 @@ async def test_legacy_to_modern_config(hass: HomeAssistant) -> None:
@pytest.mark.parametrize(("count", "state_template"), [(1, "{{ True }}")])
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_setup(hass: HomeAssistant, setup_state_switch) -> None:
async def test_setup(
hass: HomeAssistant, style: ConfigurationStyle, setup_state_switch
) -> None:
"""Test template."""
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state is not None
assert state.name == TEST_OBJECT_ID
@ -326,19 +433,26 @@ async def test_flow_preview(
("count", "state_template"), [(1, "{{ states.switch.test_state.state }}")]
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_template_state_text(hass: HomeAssistant, setup_state_switch) -> None:
async def test_template_state_text(
hass: HomeAssistant, style: ConfigurationStyle, setup_state_switch
) -> None:
"""Test the state text of a template."""
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_ON)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_ON
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_OFF)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_OFF
@ -352,12 +466,14 @@ async def test_template_state_text(hass: HomeAssistant, setup_state_switch) -> N
],
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_template_state_boolean(
hass: HomeAssistant, expected: str, setup_state_switch
hass: HomeAssistant, expected: str, style: ConfigurationStyle, setup_state_switch
) -> None:
"""Test the setting of the state with boolean template."""
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == expected
@ -371,22 +487,107 @@ async def test_template_state_boolean(
[
(ConfigurationStyle.LEGACY, "icon_template"),
(ConfigurationStyle.MODERN, "icon"),
(ConfigurationStyle.TRIGGER, "icon"),
],
)
async def test_icon_template(
hass: HomeAssistant, setup_single_attribute_switch
hass: HomeAssistant, style: ConfigurationStyle, setup_single_attribute_switch
) -> None:
"""Test the state text of a template."""
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("icon") == ""
assert state.attributes.get("icon") in ("", None)
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_ON)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes["icon"] == "mdi:check"
@pytest.mark.parametrize(
("config_attr", "attribute", "expected"),
[("icon", "icon", "mdi:icon"), ("picture", "entity_picture", "picture.jpg")],
)
async def test_attributes_with_optimistic_state(
hass: HomeAssistant,
config_attr: str,
attribute: str,
expected: str,
calls: list[ServiceCall],
) -> None:
"""Test attributes when trigger entity is optimistic."""
await async_setup_trigger_format(
hass,
1,
{
**NAMED_SWITCH_ACTIONS,
config_attr: "{{ trigger.event.data.attr }}",
},
)
hass.states.async_set(TEST_ENTITY_ID, STATE_OFF)
await hass.async_block_till_done()
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_OFF
assert state.attributes.get(attribute) is None
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: TEST_ENTITY_ID},
blocking=True,
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_ON
assert state.attributes.get(attribute) is None
assert len(calls) == 1
assert calls[-1].data["action"] == "turn_on"
assert calls[-1].data["caller"] == TEST_ENTITY_ID
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: TEST_ENTITY_ID},
blocking=True,
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_OFF
assert state.attributes.get(attribute) is None
assert len(calls) == 2
assert calls[-1].data["action"] == "turn_off"
assert calls[-1].data["caller"] == TEST_ENTITY_ID
await async_ensure_triggered_entity_updates(
hass, ConfigurationStyle.TRIGGER, attr=expected
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_OFF
assert state.attributes.get(attribute) == expected
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: TEST_ENTITY_ID},
blocking=True,
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_ON
assert state.attributes.get(attribute) == expected
assert len(calls) == 3
assert calls[-1].data["action"] == "turn_on"
assert calls[-1].data["caller"] == TEST_ENTITY_ID
@pytest.mark.parametrize(
("count", "attribute_template"),
[(1, "{% if states.switch.test_state.state %}/local/switch.png{% endif %}")],
@ -396,18 +597,21 @@ async def test_icon_template(
[
(ConfigurationStyle.LEGACY, "entity_picture_template"),
(ConfigurationStyle.MODERN, "picture"),
(ConfigurationStyle.TRIGGER, "picture"),
],
)
async def test_entity_picture_template(
hass: HomeAssistant, setup_single_attribute_switch
hass: HomeAssistant, style: ConfigurationStyle, setup_single_attribute_switch
) -> None:
"""Test entity_picture template."""
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("entity_picture") == ""
assert state.attributes.get("entity_picture") in ("", None)
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_ON)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes["entity_picture"] == "/local/switch.png"
@ -415,7 +619,7 @@ async def test_entity_picture_template(
@pytest.mark.parametrize(("count", "state_template"), [(0, "{% if rubbish %}")])
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN],
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_template_syntax_error(hass: HomeAssistant, setup_state_switch) -> None:
"""Test templating syntax error."""
@ -613,15 +817,21 @@ async def test_missing_off_does_not_create(
("count", "state_template"), [(1, "{{ states('switch.test_state') }}")]
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_on_action(
hass: HomeAssistant, setup_state_switch, calls: list[ServiceCall]
hass: HomeAssistant,
style: ConfigurationStyle,
setup_state_switch,
calls: list[ServiceCall],
) -> None:
"""Test on action."""
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_OFF)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_OFF
@ -639,7 +849,8 @@ async def test_on_action(
@pytest.mark.parametrize("count", [1])
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_on_action_optimistic(
hass: HomeAssistant, setup_optimistic_switch, calls: list[ServiceCall]
@ -670,15 +881,21 @@ async def test_on_action_optimistic(
("count", "state_template"), [(1, "{{ states.switch.test_state.state }}")]
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_off_action(
hass: HomeAssistant, setup_state_switch, calls: list[ServiceCall]
hass: HomeAssistant,
style: ConfigurationStyle,
setup_state_switch,
calls: list[ServiceCall],
) -> None:
"""Test off action."""
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_ON)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
state = hass.states.get(TEST_ENTITY_ID)
assert state.state == STATE_ON
@ -696,7 +913,8 @@ async def test_off_action(
@pytest.mark.parametrize("count", [1])
@pytest.mark.parametrize(
"style", [ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN]
"style",
[ConfigurationStyle.LEGACY, ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_off_action_optimistic(
hass: HomeAssistant, setup_optimistic_switch, calls: list[ServiceCall]
@ -760,6 +978,24 @@ async def test_off_action_optimistic(
},
template.DOMAIN,
),
(
{
"template": {
"trigger": {"trigger": "event", "event_type": "test_event"},
"switch": [
{
"name": "s1",
**SWITCH_ACTIONS,
},
{
"name": "s2",
**SWITCH_ACTIONS,
},
],
}
},
template.DOMAIN,
),
],
)
async def test_restore_state(
@ -800,20 +1036,25 @@ async def test_restore_state(
[
(ConfigurationStyle.LEGACY, "availability_template"),
(ConfigurationStyle.MODERN, "availability"),
(ConfigurationStyle.TRIGGER, "availability"),
],
)
async def test_available_template_with_entities(
hass: HomeAssistant, setup_single_attribute_switch
hass: HomeAssistant, style: ConfigurationStyle, setup_single_attribute_switch
) -> None:
"""Test availability templates with values from other entities."""
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_ON)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
assert hass.states.get(TEST_ENTITY_ID).state != STATE_UNAVAILABLE
hass.states.async_set(TEST_STATE_ENTITY_ID, STATE_OFF)
await hass.async_block_till_done()
await async_ensure_triggered_entity_updates(hass, style)
assert hass.states.get(TEST_ENTITY_ID).state == STATE_UNAVAILABLE