From f4e503627574d28d12b2489abc4c8868b1ab1ab9 Mon Sep 17 00:00:00 2001 From: G Johansson Date: Wed, 11 Jun 2025 19:58:28 +0200 Subject: [PATCH] New helper for templating args in command_line (#145899) --- .../components/command_line/notify.py | 27 ++--------- .../components/command_line/sensor.py | 33 ++----------- .../components/command_line/utils.py | 47 +++++++++++++++---- tests/components/command_line/test_notify.py | 3 +- 4 files changed, 48 insertions(+), 62 deletions(-) diff --git a/homeassistant/components/command_line/notify.py b/homeassistant/components/command_line/notify.py index 50bfbe651ef..b0031e4d5ee 100644 --- a/homeassistant/components/command_line/notify.py +++ b/homeassistant/components/command_line/notify.py @@ -9,12 +9,11 @@ from typing import Any from homeassistant.components.notify import BaseNotificationService from homeassistant.const import CONF_COMMAND from homeassistant.core import HomeAssistant -from homeassistant.exceptions import TemplateError -from homeassistant.helpers.template import Template from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.util.process import kill_subprocess from .const import CONF_COMMAND_TIMEOUT, LOGGER +from .utils import render_template_args _LOGGER = logging.getLogger(__name__) @@ -45,28 +44,10 @@ class CommandLineNotificationService(BaseNotificationService): def send_message(self, message: str = "", **kwargs: Any) -> None: """Send a message to a command line.""" - command = self.command - if " " not in command: - prog = command - args = None - args_compiled = None - else: - prog, args = command.split(" ", 1) - args_compiled = Template(args, self.hass) + if not (command := render_template_args(self.hass, self.command)): + return - rendered_args = None - if args_compiled: - args_to_render = {"arguments": args} - try: - rendered_args = args_compiled.async_render(args_to_render) - except TemplateError as ex: - LOGGER.exception("Error rendering command template: %s", ex) - return - - if rendered_args != args: - command = f"{prog} {rendered_args}" - - LOGGER.debug("Running command: %s, with message: %s", command, message) + LOGGER.debug("Running with message: %s", message) with subprocess.Popen( # noqa: S602 # shell by design command, diff --git a/homeassistant/components/command_line/sensor.py b/homeassistant/components/command_line/sensor.py index 5ce50edc4e7..dfc31b4581b 100644 --- a/homeassistant/components/command_line/sensor.py +++ b/homeassistant/components/command_line/sensor.py @@ -19,7 +19,6 @@ from homeassistant.const import ( CONF_VALUE_TEMPLATE, ) from homeassistant.core import HomeAssistant -from homeassistant.exceptions import TemplateError from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.template import Template @@ -37,7 +36,7 @@ from .const import ( LOGGER, TRIGGER_ENTITY_OPTIONS, ) -from .utils import async_check_output_or_log +from .utils import async_check_output_or_log, render_template_args DEFAULT_NAME = "Command Sensor" @@ -222,32 +221,6 @@ class CommandSensorData: async def async_update(self) -> None: """Get the latest data with a shell command.""" - command = self.command - - if " " not in command: - prog = command - args = None - args_compiled = None - else: - prog, args = command.split(" ", 1) - args_compiled = Template(args, self.hass) - - if args_compiled: - try: - args_to_render = {"arguments": args} - rendered_args = args_compiled.async_render(args_to_render) - except TemplateError as ex: - LOGGER.exception("Error rendering command template: %s", ex) - return - else: - rendered_args = None - - if rendered_args == args: - # No template used. default behavior - pass - else: - # Template used. Construct the string used in the shell - command = f"{prog} {rendered_args}" - - LOGGER.debug("Running command: %s", command) + if not (command := render_template_args(self.hass, self.command)): + return self.value = await async_check_output_or_log(command, self.timeout) diff --git a/homeassistant/components/command_line/utils.py b/homeassistant/components/command_line/utils.py index c1926546950..607340c4853 100644 --- a/homeassistant/components/command_line/utils.py +++ b/homeassistant/components/command_line/utils.py @@ -3,9 +3,13 @@ from __future__ import annotations import asyncio -import logging -_LOGGER = logging.getLogger(__name__) +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import TemplateError +from homeassistant.helpers.template import Template + +from .const import LOGGER + _EXEC_FAILED_CODE = 127 @@ -18,7 +22,7 @@ async def async_call_shell_with_timeout( return code is returned. """ try: - _LOGGER.debug("Running command: %s", command) + LOGGER.debug("Running command: %s", command) proc = await asyncio.create_subprocess_shell( # shell by design command, close_fds=False, # required for posix_spawn @@ -26,14 +30,14 @@ async def async_call_shell_with_timeout( async with asyncio.timeout(timeout): await proc.communicate() except TimeoutError: - _LOGGER.error("Timeout for command: %s", command) + LOGGER.error("Timeout for command: %s", command) return -1 return_code = proc.returncode if return_code == _EXEC_FAILED_CODE: - _LOGGER.error("Error trying to exec command: %s", command) + LOGGER.error("Error trying to exec command: %s", command) elif log_return_code and return_code != 0: - _LOGGER.error( + LOGGER.error( "Command failed (with return code %s): %s", proc.returncode, command, @@ -53,12 +57,39 @@ async def async_check_output_or_log(command: str, timeout: int) -> str | None: stdout, _ = await proc.communicate() if proc.returncode != 0: - _LOGGER.error( + LOGGER.error( "Command failed (with return code %s): %s", proc.returncode, command ) else: return stdout.strip().decode("utf-8") except TimeoutError: - _LOGGER.error("Timeout for command: %s", command) + LOGGER.error("Timeout for command: %s", command) return None + + +def render_template_args(hass: HomeAssistant, command: str) -> str | None: + """Render template arguments for command line utilities.""" + if " " not in command: + prog = command + args = None + args_compiled = None + else: + prog, args = command.split(" ", 1) + args_compiled = Template(args, hass) + + rendered_args = None + if args_compiled: + args_to_render = {"arguments": args} + try: + rendered_args = args_compiled.async_render(args_to_render) + except TemplateError as ex: + LOGGER.exception("Error rendering command template: %s", ex) + return None + + if rendered_args != args: + command = f"{prog} {rendered_args}" + + LOGGER.debug("Running command: %s", command) + + return command diff --git a/tests/components/command_line/test_notify.py b/tests/components/command_line/test_notify.py index a0c69765c9a..30523e8c740 100644 --- a/tests/components/command_line/test_notify.py +++ b/tests/components/command_line/test_notify.py @@ -126,7 +126,8 @@ async def test_command_line_output_single_command( await hass.services.async_call( NOTIFY_DOMAIN, "test3", {"message": "test message"}, blocking=True ) - assert "Running command: echo, with message: test message" in caplog.text + assert "Running command: echo" in caplog.text + assert "Running with message: test message" in caplog.text async def test_command_template(hass: HomeAssistant) -> None: