Shell command response (#96695)

* Add service response to shell_commands

* Add shell_command response tests

* Fix mypy

* Return empty dict instead of None on error

* Improved response type hint

* Cleanup after removing type cast

* Raise exceptions i.s.o. returning

* Fix ruff
This commit is contained in:
RoboMagus 2023-07-20 11:53:57 +02:00 committed by GitHub
parent 0ba2531ca4
commit c433b251fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 11 deletions

View File

@ -9,10 +9,16 @@ import shlex
import async_timeout
import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.core import (
HomeAssistant,
ServiceCall,
ServiceResponse,
SupportsResponse,
)
from homeassistant.exceptions import TemplateError
from homeassistant.helpers import config_validation as cv, template
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.json import JsonObjectType
DOMAIN = "shell_command"
@ -31,7 +37,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
cache: dict[str, tuple[str, str | None, template.Template | None]] = {}
async def async_service_handler(service: ServiceCall) -> None:
async def async_service_handler(service: ServiceCall) -> ServiceResponse:
"""Execute a shell command service."""
cmd = conf[service.service]
@ -54,7 +60,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
)
except TemplateError as ex:
_LOGGER.exception("Error rendering command template: %s", ex)
return
raise
else:
rendered_args = None
@ -97,9 +103,16 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
process._transport.close() # type: ignore[attr-defined]
del process
return
raise
service_response: JsonObjectType = {
"stdout": "",
"stderr": "",
"returncode": process.returncode,
}
if stdout_data:
service_response["stdout"] = stdout_data.decode("utf-8").strip()
_LOGGER.debug(
"Stdout of command: `%s`, return code: %s:\n%s",
cmd,
@ -107,6 +120,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
stdout_data,
)
if stderr_data:
service_response["stderr"] = stderr_data.decode("utf-8").strip()
_LOGGER.debug(
"Stderr of command: `%s`, return code: %s:\n%s",
cmd,
@ -118,6 +132,13 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"Error running command: `%s`, return code: %s", cmd, process.returncode
)
return service_response
for name in conf:
hass.services.async_register(DOMAIN, name, async_service_handler)
hass.services.async_register(
DOMAIN,
name,
async_service_handler,
supports_response=SupportsResponse.OPTIONAL,
)
return True

View File

@ -10,6 +10,7 @@ import pytest
from homeassistant.components import shell_command
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import TemplateError
from homeassistant.setup import async_setup_component
@ -83,6 +84,28 @@ async def test_template_render_no_template(mock_call, hass: HomeAssistant) -> No
assert cmd == "ls /bin"
@patch("homeassistant.components.shell_command.asyncio.create_subprocess_shell")
async def test_incorrect_template(mock_call, hass: HomeAssistant) -> None:
"""Ensure shell_commands with invalid templates are handled properly."""
mock_call.return_value = mock_process_creator(error=False)
assert await async_setup_component(
hass,
shell_command.DOMAIN,
{
shell_command.DOMAIN: {
"test_service": ("ls /bin {{ states['invalid/domain'] }}")
}
},
)
with pytest.raises(TemplateError):
await hass.services.async_call(
"shell_command", "test_service", blocking=True, return_response=True
)
await hass.async_block_till_done()
@patch("homeassistant.components.shell_command.asyncio.create_subprocess_exec")
async def test_template_render(mock_call, hass: HomeAssistant) -> None:
"""Ensure shell_commands with templates get rendered properly."""
@ -120,11 +143,14 @@ async def test_subprocess_error(mock_error, mock_call, hass: HomeAssistant) -> N
{shell_command.DOMAIN: {"test_service": f"touch {path}"}},
)
await hass.services.async_call("shell_command", "test_service", blocking=True)
response = await hass.services.async_call(
"shell_command", "test_service", blocking=True, return_response=True
)
await hass.async_block_till_done()
assert mock_call.call_count == 1
assert mock_error.call_count == 1
assert not os.path.isfile(path)
assert response["returncode"] == 1
@patch("homeassistant.components.shell_command._LOGGER.debug")
@ -137,11 +163,15 @@ async def test_stdout_captured(mock_output, hass: HomeAssistant) -> None:
{shell_command.DOMAIN: {"test_service": f"echo {test_phrase}"}},
)
await hass.services.async_call("shell_command", "test_service", blocking=True)
response = await hass.services.async_call(
"shell_command", "test_service", blocking=True, return_response=True
)
await hass.async_block_till_done()
assert mock_output.call_count == 1
assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1]
assert response["stdout"] == test_phrase
assert response["returncode"] == 0
@patch("homeassistant.components.shell_command._LOGGER.debug")
@ -154,11 +184,14 @@ async def test_stderr_captured(mock_output, hass: HomeAssistant) -> None:
{shell_command.DOMAIN: {"test_service": f">&2 echo {test_phrase}"}},
)
await hass.services.async_call("shell_command", "test_service", blocking=True)
response = await hass.services.async_call(
"shell_command", "test_service", blocking=True, return_response=True
)
await hass.async_block_till_done()
assert mock_output.call_count == 1
assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1]
assert response["stderr"] == test_phrase
async def test_do_not_run_forever(
@ -187,9 +220,13 @@ async def test_do_not_run_forever(
"homeassistant.components.shell_command.asyncio.create_subprocess_shell",
side_effect=mock_create_subprocess_shell,
):
await hass.services.async_call(
shell_command.DOMAIN, "test_service", blocking=True
)
with pytest.raises(asyncio.TimeoutError):
await hass.services.async_call(
shell_command.DOMAIN,
"test_service",
blocking=True,
return_response=True,
)
await hass.async_block_till_done()
mock_process.kill.assert_called_once()