Use entity registry id in select device conditions (#95259)

This commit is contained in:
Erik Montnemery 2023-06-26 20:30:29 +02:00 committed by GitHub
parent 4021662b48
commit eb7ad2eb09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 139 additions and 16 deletions

View File

@ -3,6 +3,9 @@ from __future__ import annotations
import voluptuous as vol import voluptuous as vol
from homeassistant.components.device_automation import (
async_get_entity_registry_entry_or_raise,
)
from homeassistant.const import ( from homeassistant.const import (
CONF_CONDITION, CONF_CONDITION,
CONF_DEVICE_ID, CONF_DEVICE_ID,
@ -30,7 +33,7 @@ CONDITION_TYPES = {"selected_option"}
CONDITION_SCHEMA = DEVICE_CONDITION_BASE_SCHEMA.extend( CONDITION_SCHEMA = DEVICE_CONDITION_BASE_SCHEMA.extend(
{ {
vol.Required(CONF_ENTITY_ID): cv.entity_id, vol.Required(CONF_ENTITY_ID): cv.entity_id_or_uuid,
vol.Required(CONF_TYPE): vol.In(CONDITION_TYPES), vol.Required(CONF_TYPE): vol.In(CONDITION_TYPES),
vol.Required(CONF_OPTION): str, vol.Required(CONF_OPTION): str,
vol.Optional(CONF_FOR): cv.positive_time_period_dict, vol.Optional(CONF_FOR): cv.positive_time_period_dict,
@ -48,7 +51,7 @@ async def async_get_conditions(
CONF_CONDITION: "device", CONF_CONDITION: "device",
CONF_DEVICE_ID: device_id, CONF_DEVICE_ID: device_id,
CONF_DOMAIN: DOMAIN, CONF_DOMAIN: DOMAIN,
CONF_ENTITY_ID: entry.entity_id, CONF_ENTITY_ID: entry.id,
CONF_TYPE: "selected_option", CONF_TYPE: "selected_option",
} }
for entry in er.async_entries_for_device(registry, device_id) for entry in er.async_entries_for_device(registry, device_id)
@ -62,11 +65,14 @@ def async_condition_from_config(
) -> condition.ConditionCheckerType: ) -> condition.ConditionCheckerType:
"""Create a function to test a device condition.""" """Create a function to test a device condition."""
registry = er.async_get(hass)
entity_id = er.async_resolve_entity_id(registry, config[CONF_ENTITY_ID])
@callback @callback
def test_is_state(hass: HomeAssistant, variables: TemplateVarsType) -> bool: def test_is_state(hass: HomeAssistant, variables: TemplateVarsType) -> bool:
"""Test if an entity is a certain state.""" """Test if an entity is a certain state."""
return condition.state( return condition.state(
hass, config[CONF_ENTITY_ID], config[CONF_OPTION], config.get(CONF_FOR) hass, entity_id, config[CONF_OPTION], config.get(CONF_FOR)
) )
return test_is_state return test_is_state
@ -76,8 +82,10 @@ async def async_get_condition_capabilities(
hass: HomeAssistant, config: ConfigType hass: HomeAssistant, config: ConfigType
) -> dict[str, vol.Schema]: ) -> dict[str, vol.Schema]:
"""List condition capabilities.""" """List condition capabilities."""
try: try:
options = get_capability(hass, config[CONF_ENTITY_ID], ATTR_OPTIONS) or [] entry = async_get_entity_registry_entry_or_raise(hass, config[CONF_ENTITY_ID])
options = get_capability(hass, entry.entity_id, ATTR_OPTIONS) or []
except HomeAssistantError: except HomeAssistantError:
options = [] options = []

View File

@ -45,7 +45,7 @@ async def test_get_conditions(
config_entry_id=config_entry.entry_id, config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
) )
entity_registry.async_get_or_create( entity_entry = entity_registry.async_get_or_create(
DOMAIN, "test", "5678", device_id=device_entry.id DOMAIN, "test", "5678", device_id=device_entry.id
) )
expected_conditions = [ expected_conditions = [
@ -54,7 +54,7 @@ async def test_get_conditions(
"domain": DOMAIN, "domain": DOMAIN,
"type": "selected_option", "type": "selected_option",
"device_id": device_entry.id, "device_id": device_entry.id,
"entity_id": f"{DOMAIN}.test_5678", "entity_id": entity_entry.id,
"metadata": {"secondary": False}, "metadata": {"secondary": False},
} }
] ]
@ -87,7 +87,7 @@ async def test_get_conditions_hidden_auxiliary(
config_entry_id=config_entry.entry_id, config_entry_id=config_entry.entry_id,
connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")}, connections={(dr.CONNECTION_NETWORK_MAC, "12:34:56:AB:CD:EF")},
) )
entity_registry.async_get_or_create( entity_entry = entity_registry.async_get_or_create(
DOMAIN, DOMAIN,
"test", "test",
"5678", "5678",
@ -101,7 +101,7 @@ async def test_get_conditions_hidden_auxiliary(
"domain": DOMAIN, "domain": DOMAIN,
"type": condition, "type": condition,
"device_id": device_entry.id, "device_id": device_entry.id,
"entity_id": f"{DOMAIN}.test_5678", "entity_id": entity_entry.id,
"metadata": {"secondary": True}, "metadata": {"secondary": True},
} }
for condition in ["selected_option"] for condition in ["selected_option"]
@ -113,9 +113,13 @@ async def test_get_conditions_hidden_auxiliary(
async def test_if_selected_option( async def test_if_selected_option(
hass: HomeAssistant, calls: list[ServiceCall] hass: HomeAssistant,
calls: list[ServiceCall],
entity_registry: er.EntityRegistry,
) -> None: ) -> None:
"""Test for selected_option conditions.""" """Test for selected_option conditions."""
entry = entity_registry.async_get_or_create(DOMAIN, "test", "5678")
assert await async_setup_component( assert await async_setup_component(
hass, hass,
automation.DOMAIN, automation.DOMAIN,
@ -128,7 +132,7 @@ async def test_if_selected_option(
"condition": "device", "condition": "device",
"domain": DOMAIN, "domain": DOMAIN,
"device_id": "", "device_id": "",
"entity_id": "select.entity", "entity_id": entry.id,
"type": "selected_option", "type": "selected_option",
"option": "option1", "option": "option1",
} }
@ -147,7 +151,7 @@ async def test_if_selected_option(
"condition": "device", "condition": "device",
"domain": DOMAIN, "domain": DOMAIN,
"device_id": "", "device_id": "",
"entity_id": "select.entity", "entity_id": entry.id,
"type": "selected_option", "type": "selected_option",
"option": "option2", "option": "option2",
} }
@ -170,7 +174,7 @@ async def test_if_selected_option(
assert len(calls) == 0 assert len(calls) == 0
hass.states.async_set( hass.states.async_set(
"select.entity", "option1", {"options": ["option1", "option2"]} entry.entity_id, "option1", {"options": ["option1", "option2"]}
) )
hass.bus.async_fire("test_event1") hass.bus.async_fire("test_event1")
hass.bus.async_fire("test_event2") hass.bus.async_fire("test_event2")
@ -179,7 +183,7 @@ async def test_if_selected_option(
assert calls[0].data["result"] == "option1 - event - test_event1" assert calls[0].data["result"] == "option1 - event - test_event1"
hass.states.async_set( hass.states.async_set(
"select.entity", "option2", {"options": ["option1", "option2"]} entry.entity_id, "option2", {"options": ["option1", "option2"]}
) )
hass.bus.async_fire("test_event1") hass.bus.async_fire("test_event1")
hass.bus.async_fire("test_event2") hass.bus.async_fire("test_event2")
@ -188,13 +192,62 @@ async def test_if_selected_option(
assert calls[1].data["result"] == "option2 - event - test_event2" assert calls[1].data["result"] == "option2 - event - test_event2"
async def test_get_condition_capabilities(hass: HomeAssistant) -> None: async def test_if_selected_option_legacy(
hass: HomeAssistant,
calls: list[ServiceCall],
entity_registry: er.EntityRegistry,
) -> None:
"""Test for selected_option conditions."""
entry = entity_registry.async_get_or_create(DOMAIN, "test", "5678")
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: [
{
"trigger": {"platform": "event", "event_type": "test_event1"},
"condition": [
{
"condition": "device",
"domain": DOMAIN,
"device_id": "",
"entity_id": entry.entity_id,
"type": "selected_option",
"option": "option1",
}
],
"action": {
"service": "test.automation",
"data": {
"result": "option1 - {{ trigger.platform }} - {{ trigger.event.event_type }}"
},
},
},
]
},
)
hass.states.async_set(
entry.entity_id, "option1", {"options": ["option1", "option2"]}
)
hass.bus.async_fire("test_event1")
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0].data["result"] == "option1 - event - test_event1"
async def test_get_condition_capabilities(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test we get the expected capabilities from a select condition.""" """Test we get the expected capabilities from a select condition."""
entry = entity_registry.async_get_or_create(DOMAIN, "test", "5678")
config = { config = {
"platform": "device", "platform": "device",
"domain": DOMAIN, "domain": DOMAIN,
"type": "selected_option", "type": "selected_option",
"entity_id": "select.test", "entity_id": entry.id,
"option": "option1", "option": "option1",
} }
@ -219,7 +272,69 @@ async def test_get_condition_capabilities(hass: HomeAssistant) -> None:
] ]
# Mock an entity # Mock an entity
hass.states.async_set("select.test", "option1", {"options": ["option1", "option2"]}) hass.states.async_set(
entry.entity_id, "option1", {"options": ["option1", "option2"]}
)
# Test if we get the right capabilities now
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
"name": "option",
"required": True,
"type": "select",
"options": [("option1", "option1"), ("option2", "option2")],
},
{
"name": "for",
"optional": True,
"type": "positive_time_period_dict",
},
]
async def test_get_condition_capabilities_legacy(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test we get the expected capabilities from a select condition."""
entry = entity_registry.async_get_or_create(DOMAIN, "test", "5678")
config = {
"platform": "device",
"domain": DOMAIN,
"type": "selected_option",
"entity_id": entry.entity_id,
"option": "option1",
}
# Test when entity doesn't exists
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
"name": "option",
"required": True,
"type": "select",
"options": [],
},
{
"name": "for",
"optional": True,
"type": "positive_time_period_dict",
},
]
# Mock an entity
hass.states.async_set(
entry.entity_id, "option1", {"options": ["option1", "option2"]}
)
# Test if we get the right capabilities now # Test if we get the right capabilities now
capabilities = await async_get_condition_capabilities(hass, config) capabilities = await async_get_condition_capabilities(hass, config)