Add WS command blueprint/substitute (#119890)

This commit is contained in:
Erik Montnemery 2024-06-25 20:15:11 +02:00 committed by GitHub
parent 9dc26652ee
commit 7d2ae5b3a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 200 additions and 26 deletions

View File

@ -3,6 +3,8 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
import functools
from typing import Any, cast
import voluptuous as vol
@ -15,16 +17,50 @@ from homeassistant.util import yaml
from . import importer, models
from .const import DOMAIN
from .errors import FailedToLoad, FileAlreadyExists
from .errors import BlueprintException, FailedToLoad, FileAlreadyExists
@callback
def async_setup(hass: HomeAssistant) -> None:
"""Set up the websocket API."""
websocket_api.async_register_command(hass, ws_list_blueprints)
websocket_api.async_register_command(hass, ws_import_blueprint)
websocket_api.async_register_command(hass, ws_save_blueprint)
websocket_api.async_register_command(hass, ws_delete_blueprint)
websocket_api.async_register_command(hass, ws_import_blueprint)
websocket_api.async_register_command(hass, ws_list_blueprints)
websocket_api.async_register_command(hass, ws_save_blueprint)
websocket_api.async_register_command(hass, ws_substitute_blueprint)
def _ws_with_blueprint_domain(
func: Callable[
[
HomeAssistant,
websocket_api.ActiveConnection,
dict[str, Any],
models.DomainBlueprints,
],
Coroutine[Any, Any, None],
],
) -> websocket_api.AsyncWebSocketCommandHandler:
"""Decorate a function to pass in the domain blueprints."""
@functools.wraps(func)
async def with_domain_blueprints(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
domain_blueprints: models.DomainBlueprints | None = hass.data.get(
DOMAIN, {}
).get(msg["domain"])
if domain_blueprints is None:
connection.send_error(
msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain"
)
return
await func(hass, connection, msg, domain_blueprints)
return with_domain_blueprints
@websocket_api.websocket_command(
@ -124,23 +160,18 @@ async def ws_import_blueprint(
}
)
@websocket_api.async_response
@_ws_with_blueprint_domain
async def ws_save_blueprint(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
domain_blueprints: models.DomainBlueprints,
) -> None:
"""Save a blueprint."""
path = msg["path"]
domain = msg["domain"]
domain_blueprints: dict[str, models.DomainBlueprints] = hass.data.get(DOMAIN, {})
if domain not in domain_blueprints:
connection.send_error(
msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain"
)
try:
yaml_data = cast(dict[str, Any], yaml.parse_yaml(msg["yaml"]))
blueprint = models.Blueprint(yaml_data, expected_domain=domain)
@ -154,7 +185,7 @@ async def ws_save_blueprint(
path = f"{path}.yaml"
try:
overrides_existing = await domain_blueprints[domain].async_add_blueprint(
overrides_existing = await domain_blueprints.async_add_blueprint(
blueprint, path, allow_override=msg.get("allow_override", False)
)
except FileAlreadyExists:
@ -180,25 +211,16 @@ async def ws_save_blueprint(
}
)
@websocket_api.async_response
@_ws_with_blueprint_domain
async def ws_delete_blueprint(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
domain_blueprints: models.DomainBlueprints,
) -> None:
"""Delete a blueprint."""
path = msg["path"]
domain = msg["domain"]
domain_blueprints: dict[str, models.DomainBlueprints] = hass.data.get(DOMAIN, {})
if domain not in domain_blueprints:
connection.send_error(
msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain"
)
try:
await domain_blueprints[domain].async_remove_blueprint(path)
await domain_blueprints.async_remove_blueprint(msg["path"])
except OSError as err:
connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err))
return
@ -206,3 +228,40 @@ async def ws_delete_blueprint(
connection.send_result(
msg["id"],
)
@websocket_api.websocket_command(
{
vol.Required("type"): "blueprint/substitute",
vol.Required("domain"): cv.string,
vol.Required("path"): cv.path,
vol.Required("input"): dict,
}
)
@websocket_api.async_response
@_ws_with_blueprint_domain
async def ws_substitute_blueprint(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
domain_blueprints: models.DomainBlueprints,
) -> None:
"""Process a blueprinted config to allow editing."""
blueprint_config = {"use_blueprint": {"path": msg["path"], "input": msg["input"]}}
try:
blueprint_inputs = await domain_blueprints.async_inputs_from_config(
blueprint_config
)
except BlueprintException as err:
connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err))
return
try:
config = blueprint_inputs.async_substitute()
except yaml.UndefinedSubstitution as err:
connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err))
return
connection.send_result(msg["id"], {"substituted_config": config})

View File

@ -9,7 +9,7 @@ import yaml
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from homeassistant.util.yaml import parse_yaml
from homeassistant.util.yaml import UndefinedSubstitution, parse_yaml
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import WebSocketGenerator
@ -454,9 +454,124 @@ async def test_delete_blueprint_in_use_by_script(
msg = await client.receive_json()
assert not unlink_mock.mock_calls
assert msg["id"] == 9
assert not msg["success"]
assert msg["error"] == {
"code": "home_assistant_error",
"message": "Blueprint in use",
}
async def test_substituting_blueprint_inputs(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test substituting blueprint inputs."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "blueprint/substitute",
"domain": "automation",
"path": "test_event_service.yaml",
"input": {
"trigger_event": "test_event",
"service_to_call": "test.automation",
"a_number": 5,
},
}
)
msg = await client.receive_json()
assert msg["success"]
assert msg["result"]["substituted_config"] == {
"action": {
"entity_id": "light.kitchen",
"service": "test.automation",
},
"trigger": {
"event_type": "test_event",
"platform": "event",
},
}
async def test_substituting_blueprint_inputs_unknown_domain(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test substituting blueprint inputs."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "blueprint/substitute",
"domain": "donald_duck",
"path": "test_event_service.yaml",
"input": {
"trigger_event": "test_event",
"service_to_call": "test.automation",
"a_number": 5,
},
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"] == {
"code": "invalid_format",
"message": "Unsupported domain",
}
async def test_substituting_blueprint_inputs_incomplete_input(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test substituting blueprint inputs."""
client = await hass_ws_client(hass)
await client.send_json_auto_id(
{
"type": "blueprint/substitute",
"domain": "automation",
"path": "test_event_service.yaml",
"input": {
"service_to_call": "test.automation",
"a_number": 5,
},
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"] == {
"code": "unknown_error",
"message": "Missing input trigger_event",
}
async def test_substituting_blueprint_inputs_incomplete_input_2(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test substituting blueprint inputs."""
client = await hass_ws_client(hass)
with patch(
"homeassistant.components.blueprint.models.BlueprintInputs.async_substitute",
side_effect=UndefinedSubstitution("blah"),
):
await client.send_json_auto_id(
{
"type": "blueprint/substitute",
"domain": "automation",
"path": "test_event_service.yaml",
"input": {
"trigger_event": "test_event",
"service_to_call": "test.automation",
"a_number": 5,
},
}
)
msg = await client.receive_json()
assert not msg["success"]
assert msg["error"] == {
"code": "unknown_error",
"message": "No substitution found for input blah",
}