New helper for templating args in command_line (#145899)

This commit is contained in:
G Johansson 2025-06-11 19:58:28 +02:00 committed by GitHub
parent 59aba339d8
commit f4e5036275
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 48 additions and 62 deletions

View File

@ -9,12 +9,11 @@ from typing import Any
from homeassistant.components.notify import BaseNotificationService from homeassistant.components.notify import BaseNotificationService
from homeassistant.const import CONF_COMMAND from homeassistant.const import CONF_COMMAND
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util.process import kill_subprocess from homeassistant.util.process import kill_subprocess
from .const import CONF_COMMAND_TIMEOUT, LOGGER from .const import CONF_COMMAND_TIMEOUT, LOGGER
from .utils import render_template_args
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -45,28 +44,10 @@ class CommandLineNotificationService(BaseNotificationService):
def send_message(self, message: str = "", **kwargs: Any) -> None: def send_message(self, message: str = "", **kwargs: Any) -> None:
"""Send a message to a command line.""" """Send a message to a command line."""
command = self.command if not (command := render_template_args(self.hass, self.command)):
if " " not in command: return
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, self.hass)
rendered_args = None LOGGER.debug("Running with message: %s", message)
if args_compiled:
args_to_render = {"arguments": args}
try:
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return
if rendered_args != args:
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s, with message: %s", command, message)
with subprocess.Popen( # noqa: S602 # shell by design with subprocess.Popen( # noqa: S602 # shell by design
command, command,

View File

@ -19,7 +19,6 @@ from homeassistant.const import (
CONF_VALUE_TEMPLATE, CONF_VALUE_TEMPLATE,
) )
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_time_interval from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.template import Template from homeassistant.helpers.template import Template
@ -37,7 +36,7 @@ from .const import (
LOGGER, LOGGER,
TRIGGER_ENTITY_OPTIONS, TRIGGER_ENTITY_OPTIONS,
) )
from .utils import async_check_output_or_log from .utils import async_check_output_or_log, render_template_args
DEFAULT_NAME = "Command Sensor" DEFAULT_NAME = "Command Sensor"
@ -222,32 +221,6 @@ class CommandSensorData:
async def async_update(self) -> None: async def async_update(self) -> None:
"""Get the latest data with a shell command.""" """Get the latest data with a shell command."""
command = self.command if not (command := render_template_args(self.hass, self.command)):
return
if " " not in command:
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, self.hass)
if args_compiled:
try:
args_to_render = {"arguments": args}
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return
else:
rendered_args = None
if rendered_args == args:
# No template used. default behavior
pass
else:
# Template used. Construct the string used in the shell
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s", command)
self.value = await async_check_output_or_log(command, self.timeout) self.value = await async_check_output_or_log(command, self.timeout)

View File

@ -3,9 +3,13 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
_LOGGER = logging.getLogger(__name__) from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.helpers.template import Template
from .const import LOGGER
_EXEC_FAILED_CODE = 127 _EXEC_FAILED_CODE = 127
@ -18,7 +22,7 @@ async def async_call_shell_with_timeout(
return code is returned. return code is returned.
""" """
try: try:
_LOGGER.debug("Running command: %s", command) LOGGER.debug("Running command: %s", command)
proc = await asyncio.create_subprocess_shell( # shell by design proc = await asyncio.create_subprocess_shell( # shell by design
command, command,
close_fds=False, # required for posix_spawn close_fds=False, # required for posix_spawn
@ -26,14 +30,14 @@ async def async_call_shell_with_timeout(
async with asyncio.timeout(timeout): async with asyncio.timeout(timeout):
await proc.communicate() await proc.communicate()
except TimeoutError: except TimeoutError:
_LOGGER.error("Timeout for command: %s", command) LOGGER.error("Timeout for command: %s", command)
return -1 return -1
return_code = proc.returncode return_code = proc.returncode
if return_code == _EXEC_FAILED_CODE: if return_code == _EXEC_FAILED_CODE:
_LOGGER.error("Error trying to exec command: %s", command) LOGGER.error("Error trying to exec command: %s", command)
elif log_return_code and return_code != 0: elif log_return_code and return_code != 0:
_LOGGER.error( LOGGER.error(
"Command failed (with return code %s): %s", "Command failed (with return code %s): %s",
proc.returncode, proc.returncode,
command, command,
@ -53,12 +57,39 @@ async def async_check_output_or_log(command: str, timeout: int) -> str | None:
stdout, _ = await proc.communicate() stdout, _ = await proc.communicate()
if proc.returncode != 0: if proc.returncode != 0:
_LOGGER.error( LOGGER.error(
"Command failed (with return code %s): %s", proc.returncode, command "Command failed (with return code %s): %s", proc.returncode, command
) )
else: else:
return stdout.strip().decode("utf-8") return stdout.strip().decode("utf-8")
except TimeoutError: except TimeoutError:
_LOGGER.error("Timeout for command: %s", command) LOGGER.error("Timeout for command: %s", command)
return None return None
def render_template_args(hass: HomeAssistant, command: str) -> str | None:
"""Render template arguments for command line utilities."""
if " " not in command:
prog = command
args = None
args_compiled = None
else:
prog, args = command.split(" ", 1)
args_compiled = Template(args, hass)
rendered_args = None
if args_compiled:
args_to_render = {"arguments": args}
try:
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return None
if rendered_args != args:
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s", command)
return command

View File

@ -126,7 +126,8 @@ async def test_command_line_output_single_command(
await hass.services.async_call( await hass.services.async_call(
NOTIFY_DOMAIN, "test3", {"message": "test message"}, blocking=True NOTIFY_DOMAIN, "test3", {"message": "test message"}, blocking=True
) )
assert "Running command: echo, with message: test message" in caplog.text assert "Running command: echo" in caplog.text
assert "Running with message: test message" in caplog.text
async def test_command_template(hass: HomeAssistant) -> None: async def test_command_template(hass: HomeAssistant) -> None: