diff --git a/homeassistant/components/blueprint/websocket_api.py b/homeassistant/components/blueprint/websocket_api.py index 98cc8131166..9d3329d8195 100644 --- a/homeassistant/components/blueprint/websocket_api.py +++ b/homeassistant/components/blueprint/websocket_api.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio +from collections.abc import Callable, Coroutine +import functools from typing import Any, cast import voluptuous as vol @@ -15,16 +17,50 @@ from homeassistant.util import yaml from . import importer, models from .const import DOMAIN -from .errors import FailedToLoad, FileAlreadyExists +from .errors import BlueprintException, FailedToLoad, FileAlreadyExists @callback def async_setup(hass: HomeAssistant) -> None: """Set up the websocket API.""" - websocket_api.async_register_command(hass, ws_list_blueprints) - websocket_api.async_register_command(hass, ws_import_blueprint) - websocket_api.async_register_command(hass, ws_save_blueprint) websocket_api.async_register_command(hass, ws_delete_blueprint) + websocket_api.async_register_command(hass, ws_import_blueprint) + websocket_api.async_register_command(hass, ws_list_blueprints) + websocket_api.async_register_command(hass, ws_save_blueprint) + websocket_api.async_register_command(hass, ws_substitute_blueprint) + + +def _ws_with_blueprint_domain( + func: Callable[ + [ + HomeAssistant, + websocket_api.ActiveConnection, + dict[str, Any], + models.DomainBlueprints, + ], + Coroutine[Any, Any, None], + ], +) -> websocket_api.AsyncWebSocketCommandHandler: + """Decorate a function to pass in the domain blueprints.""" + + @functools.wraps(func) + async def with_domain_blueprints( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: dict[str, Any], + ) -> None: + domain_blueprints: models.DomainBlueprints | None = hass.data.get( + DOMAIN, {} + ).get(msg["domain"]) + if domain_blueprints is None: + connection.send_error( + msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain" + ) + return + + await func(hass, connection, msg, domain_blueprints) + + return with_domain_blueprints @websocket_api.websocket_command( @@ -124,23 +160,18 @@ async def ws_import_blueprint( } ) @websocket_api.async_response +@_ws_with_blueprint_domain async def ws_save_blueprint( hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any], + domain_blueprints: models.DomainBlueprints, ) -> None: """Save a blueprint.""" path = msg["path"] domain = msg["domain"] - domain_blueprints: dict[str, models.DomainBlueprints] = hass.data.get(DOMAIN, {}) - - if domain not in domain_blueprints: - connection.send_error( - msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain" - ) - try: yaml_data = cast(dict[str, Any], yaml.parse_yaml(msg["yaml"])) blueprint = models.Blueprint(yaml_data, expected_domain=domain) @@ -154,7 +185,7 @@ async def ws_save_blueprint( path = f"{path}.yaml" try: - overrides_existing = await domain_blueprints[domain].async_add_blueprint( + overrides_existing = await domain_blueprints.async_add_blueprint( blueprint, path, allow_override=msg.get("allow_override", False) ) except FileAlreadyExists: @@ -180,25 +211,16 @@ async def ws_save_blueprint( } ) @websocket_api.async_response +@_ws_with_blueprint_domain async def ws_delete_blueprint( hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict[str, Any], + domain_blueprints: models.DomainBlueprints, ) -> None: """Delete a blueprint.""" - - path = msg["path"] - domain = msg["domain"] - - domain_blueprints: dict[str, models.DomainBlueprints] = hass.data.get(DOMAIN, {}) - - if domain not in domain_blueprints: - connection.send_error( - msg["id"], websocket_api.ERR_INVALID_FORMAT, "Unsupported domain" - ) - try: - await domain_blueprints[domain].async_remove_blueprint(path) + await domain_blueprints.async_remove_blueprint(msg["path"]) except OSError as err: connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err)) return @@ -206,3 +228,40 @@ async def ws_delete_blueprint( connection.send_result( msg["id"], ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): "blueprint/substitute", + vol.Required("domain"): cv.string, + vol.Required("path"): cv.path, + vol.Required("input"): dict, + } +) +@websocket_api.async_response +@_ws_with_blueprint_domain +async def ws_substitute_blueprint( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: dict[str, Any], + domain_blueprints: models.DomainBlueprints, +) -> None: + """Process a blueprinted config to allow editing.""" + + blueprint_config = {"use_blueprint": {"path": msg["path"], "input": msg["input"]}} + + try: + blueprint_inputs = await domain_blueprints.async_inputs_from_config( + blueprint_config + ) + except BlueprintException as err: + connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err)) + return + + try: + config = blueprint_inputs.async_substitute() + except yaml.UndefinedSubstitution as err: + connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err)) + return + + connection.send_result(msg["id"], {"substituted_config": config}) diff --git a/tests/components/blueprint/test_websocket_api.py b/tests/components/blueprint/test_websocket_api.py index 1f684b451ed..13615803569 100644 --- a/tests/components/blueprint/test_websocket_api.py +++ b/tests/components/blueprint/test_websocket_api.py @@ -9,7 +9,7 @@ import yaml from homeassistant.core import HomeAssistant from homeassistant.setup import async_setup_component -from homeassistant.util.yaml import parse_yaml +from homeassistant.util.yaml import UndefinedSubstitution, parse_yaml from tests.test_util.aiohttp import AiohttpClientMocker from tests.typing import WebSocketGenerator @@ -454,9 +454,124 @@ async def test_delete_blueprint_in_use_by_script( msg = await client.receive_json() assert not unlink_mock.mock_calls - assert msg["id"] == 9 assert not msg["success"] assert msg["error"] == { "code": "home_assistant_error", "message": "Blueprint in use", } + + +async def test_substituting_blueprint_inputs( + hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test substituting blueprint inputs.""" + client = await hass_ws_client(hass) + await client.send_json_auto_id( + { + "type": "blueprint/substitute", + "domain": "automation", + "path": "test_event_service.yaml", + "input": { + "trigger_event": "test_event", + "service_to_call": "test.automation", + "a_number": 5, + }, + } + ) + + msg = await client.receive_json() + + assert msg["success"] + assert msg["result"]["substituted_config"] == { + "action": { + "entity_id": "light.kitchen", + "service": "test.automation", + }, + "trigger": { + "event_type": "test_event", + "platform": "event", + }, + } + + +async def test_substituting_blueprint_inputs_unknown_domain( + hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test substituting blueprint inputs.""" + client = await hass_ws_client(hass) + await client.send_json_auto_id( + { + "type": "blueprint/substitute", + "domain": "donald_duck", + "path": "test_event_service.yaml", + "input": { + "trigger_event": "test_event", + "service_to_call": "test.automation", + "a_number": 5, + }, + } + ) + + msg = await client.receive_json() + + assert not msg["success"] + assert msg["error"] == { + "code": "invalid_format", + "message": "Unsupported domain", + } + + +async def test_substituting_blueprint_inputs_incomplete_input( + hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test substituting blueprint inputs.""" + client = await hass_ws_client(hass) + await client.send_json_auto_id( + { + "type": "blueprint/substitute", + "domain": "automation", + "path": "test_event_service.yaml", + "input": { + "service_to_call": "test.automation", + "a_number": 5, + }, + } + ) + + msg = await client.receive_json() + + assert not msg["success"] + assert msg["error"] == { + "code": "unknown_error", + "message": "Missing input trigger_event", + } + + +async def test_substituting_blueprint_inputs_incomplete_input_2( + hass: HomeAssistant, hass_ws_client: WebSocketGenerator +) -> None: + """Test substituting blueprint inputs.""" + client = await hass_ws_client(hass) + with patch( + "homeassistant.components.blueprint.models.BlueprintInputs.async_substitute", + side_effect=UndefinedSubstitution("blah"), + ): + await client.send_json_auto_id( + { + "type": "blueprint/substitute", + "domain": "automation", + "path": "test_event_service.yaml", + "input": { + "trigger_event": "test_event", + "service_to_call": "test.automation", + "a_number": 5, + }, + } + ) + msg = await client.receive_json() + + assert not msg["success"] + assert msg["error"] == { + "code": "unknown_error", + "message": "No substitution found for input blah", + }