Convert command_line to use asyncio for subprocesses (#111927)

* Convert command_line to use asyncio for subprocesses

* fixes

* fix

* fixes

* more test fixes

* more fixes

* fixes

* preen
This commit is contained in:
J. Nick Koston 2024-03-01 18:15:10 -10:00 committed by GitHub
parent 5f65315e86
commit c0f7ade92b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 115 additions and 117 deletions

View File

@ -148,7 +148,7 @@ class CommandBinarySensor(ManualTriggerEntity, BinarySensorEntity):
async def _async_update(self) -> None:
"""Get the latest data and updates the state."""
await self.hass.async_add_executor_job(self.data.update)
await self.data.async_update()
value = self.data.value
if self._value_template is not None:

View File

@ -28,7 +28,7 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util, slugify
from .const import CONF_COMMAND_TIMEOUT, LOGGER
from .utils import call_shell_with_timeout, check_output_or_log
from .utils import async_call_shell_with_timeout, async_check_output_or_log
SCAN_INTERVAL = timedelta(seconds=15)
@ -114,11 +114,11 @@ class CommandCover(ManualTriggerEntity, CoverEntity):
),
)
def _move_cover(self, command: str) -> bool:
async def _async_move_cover(self, command: str) -> bool:
"""Execute the actual commands."""
LOGGER.info("Running command: %s", command)
returncode = call_shell_with_timeout(command, self._timeout)
returncode = await async_call_shell_with_timeout(command, self._timeout)
success = returncode == 0
if not success:
@ -143,11 +143,11 @@ class CommandCover(ManualTriggerEntity, CoverEntity):
"""
return self._state
def _query_state(self) -> str | None:
async def _async_query_state(self) -> str | None:
"""Query for the state."""
if self._command_state:
LOGGER.info("Running state value command: %s", self._command_state)
return check_output_or_log(self._command_state, self._timeout)
return await async_check_output_or_log(self._command_state, self._timeout)
if TYPE_CHECKING:
return None
@ -169,7 +169,7 @@ class CommandCover(ManualTriggerEntity, CoverEntity):
async def _async_update(self) -> None:
"""Update device state."""
if self._command_state:
payload = str(await self.hass.async_add_executor_job(self._query_state))
payload = str(await self._async_query_state())
if self._value_template:
payload = self._value_template.async_render_with_possible_json_value(
payload, None
@ -189,15 +189,15 @@ class CommandCover(ManualTriggerEntity, CoverEntity):
async def async_open_cover(self, **kwargs: Any) -> None:
"""Open the cover."""
await self.hass.async_add_executor_job(self._move_cover, self._command_open)
await self._async_move_cover(self._command_open)
await self._update_entity_state()
async def async_close_cover(self, **kwargs: Any) -> None:
"""Close the cover."""
await self.hass.async_add_executor_job(self._move_cover, self._command_close)
await self._async_move_cover(self._command_close)
await self._update_entity_state()
async def async_stop_cover(self, **kwargs: Any) -> None:
"""Stop the cover."""
await self.hass.async_add_executor_job(self._move_cover, self._command_stop)
await self._async_move_cover(self._command_stop)
await self._update_entity_state()

View File

@ -33,7 +33,7 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util
from .const import CONF_COMMAND_TIMEOUT, LOGGER
from .utils import check_output_or_log
from .utils import async_check_output_or_log
CONF_JSON_ATTRIBUTES = "json_attributes"
@ -138,6 +138,7 @@ class CommandSensor(ManualTriggerSensorEntity):
"""Update the state of the entity."""
if self._process_updates is None:
self._process_updates = asyncio.Lock()
if self._process_updates.locked():
LOGGER.warning(
"Updating Command Line Sensor %s took longer than the scheduled update interval %s",
@ -151,7 +152,7 @@ class CommandSensor(ManualTriggerSensorEntity):
async def _async_update(self) -> None:
"""Get the latest data and updates the state."""
await self.hass.async_add_executor_job(self.data.update)
await self.data.async_update()
value = self.data.value
if self._json_attributes:
@ -216,7 +217,7 @@ class CommandSensorData:
self.command = command
self.timeout = command_timeout
def update(self) -> None:
async def async_update(self) -> None:
"""Get the latest data with a shell command."""
command = self.command
@ -231,7 +232,7 @@ class CommandSensorData:
if args_compiled:
try:
args_to_render = {"arguments": args}
rendered_args = args_compiled.render(args_to_render)
rendered_args = args_compiled.async_render(args_to_render)
except TemplateError as ex:
LOGGER.exception("Error rendering command template: %s", ex)
return
@ -246,4 +247,4 @@ class CommandSensorData:
command = f"{prog} {rendered_args}"
LOGGER.debug("Running command: %s", command)
self.value = check_output_or_log(command, self.timeout)
self.value = await async_check_output_or_log(command, self.timeout)

View File

@ -28,7 +28,7 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import dt as dt_util, slugify
from .const import CONF_COMMAND_TIMEOUT, LOGGER
from .utils import call_shell_with_timeout, check_output_or_log
from .utils import async_call_shell_with_timeout, async_check_output_or_log
SCAN_INTERVAL = timedelta(seconds=30)
@ -121,28 +121,26 @@ class CommandSwitch(ManualTriggerEntity, SwitchEntity):
"""Execute the actual commands."""
LOGGER.info("Running command: %s", command)
success = (
await self.hass.async_add_executor_job(
call_shell_with_timeout, command, self._timeout
)
== 0
)
success = await async_call_shell_with_timeout(command, self._timeout) == 0
if not success:
LOGGER.error("Command failed: %s", command)
return success
def _query_state_value(self, command: str) -> str | None:
async def _async_query_state_value(self, command: str) -> str | None:
"""Execute state command for return value."""
LOGGER.info("Running state value command: %s", command)
return check_output_or_log(command, self._timeout)
return await async_check_output_or_log(command, self._timeout)
def _query_state_code(self, command: str) -> bool:
async def _async_query_state_code(self, command: str) -> bool:
"""Execute state command for return code."""
LOGGER.info("Running state code command: %s", command)
return (
call_shell_with_timeout(command, self._timeout, log_return_code=False) == 0
await async_call_shell_with_timeout(
command, self._timeout, log_return_code=False
)
== 0
)
@property
@ -150,12 +148,12 @@ class CommandSwitch(ManualTriggerEntity, SwitchEntity):
"""Return true if we do optimistic updates."""
return self._command_state is None
def _query_state(self) -> str | int | None:
async def _async_query_state(self) -> str | int | None:
"""Query for state."""
if self._command_state:
if self._value_template:
return self._query_state_value(self._command_state)
return self._query_state_code(self._command_state)
return await self._async_query_state_value(self._command_state)
return await self._async_query_state_code(self._command_state)
if TYPE_CHECKING:
return None
@ -177,7 +175,7 @@ class CommandSwitch(ManualTriggerEntity, SwitchEntity):
async def _async_update(self) -> None:
"""Update device state."""
if self._command_state:
payload = str(await self.hass.async_add_executor_job(self._query_state))
payload = str(await self._async_query_state())
value = None
if self._value_template:
value = self._value_template.async_render_with_possible_json_value(

View File

@ -1,13 +1,14 @@
"""The command_line component utils."""
from __future__ import annotations
import asyncio
import logging
import subprocess
_LOGGER = logging.getLogger(__name__)
_EXEC_FAILED_CODE = 127
def call_shell_with_timeout(
async def async_call_shell_with_timeout(
command: str, timeout: int, *, log_return_code: bool = True
) -> int:
"""Run a shell command with a timeout.
@ -17,46 +18,45 @@ def call_shell_with_timeout(
"""
try:
_LOGGER.debug("Running command: %s", command)
subprocess.check_output(
proc = await asyncio.create_subprocess_shell( # noqa: S602 # shell by design
command,
shell=True, # noqa: S602 # shell by design
timeout=timeout,
close_fds=False, # required for posix_spawn
)
return 0
except subprocess.CalledProcessError as proc_exception:
if log_return_code:
async with asyncio.timeout(timeout):
await proc.communicate()
return_code = proc.returncode
if return_code == _EXEC_FAILED_CODE:
_LOGGER.error("Error trying to exec command: %s", command)
elif log_return_code and return_code != 0:
_LOGGER.error(
"Command failed (with return code %s): %s",
proc_exception.returncode,
proc.returncode,
command,
)
return proc_exception.returncode
except subprocess.TimeoutExpired:
return return_code or 0
except TimeoutError:
_LOGGER.error("Timeout for command: %s", command)
return -1
except subprocess.SubprocessError:
_LOGGER.error("Error trying to exec command: %s", command)
return -1
def check_output_or_log(command: str, timeout: int) -> str | None:
async def async_check_output_or_log(command: str, timeout: int) -> str | None:
"""Run a shell command with a timeout and return the output."""
try:
return_value = subprocess.check_output(
proc = await asyncio.create_subprocess_shell( # noqa: S602 # shell by design
command,
shell=True, # noqa: S602 # shell by design
timeout=timeout,
close_fds=False, # required for posix_spawn
stdout=asyncio.subprocess.PIPE,
)
return return_value.strip().decode("utf-8")
except subprocess.CalledProcessError as err:
_LOGGER.error(
"Command failed (with return code %s): %s", err.returncode, command
)
except subprocess.TimeoutExpired:
async with asyncio.timeout(timeout):
stdout, _ = await proc.communicate()
if proc.returncode != 0:
_LOGGER.error(
"Command failed (with return code %s): %s", proc.returncode, command
)
else:
return stdout.strip().decode("utf-8")
except TimeoutError:
_LOGGER.error("Timeout for command: %s", command)
except subprocess.SubprocessError:
_LOGGER.error("Error trying to exec command: %s", command)
return None

View File

@ -1 +1,30 @@
"""Tests for command_line component."""
import asyncio
from contextlib import contextmanager
from unittest.mock import MagicMock, patch
@contextmanager
def mock_asyncio_subprocess_run(
response: bytes = b"", returncode: int = 0, exception: Exception = None
):
"""Mock create_subprocess_shell."""
class MockProcess(asyncio.subprocess.Process):
@property
def returncode(self):
return returncode
async def communicate(self):
if exception:
raise exception
return response, b""
mock_process = MockProcess(MagicMock(), MagicMock(), MagicMock())
with patch(
"homeassistant.components.command_line.utils.asyncio.create_subprocess_shell",
return_value=mock_process,
) as mock:
yield mock

View File

@ -21,6 +21,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util
from . import mock_asyncio_subprocess_run
from tests.common import async_fire_time_changed
@ -329,10 +331,7 @@ async def test_availability(
hass.states.async_set("sensor.input1", "off")
await hass.async_block_till_done()
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"0",
):
with mock_asyncio_subprocess_run(b"0"):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()

View File

@ -30,16 +30,15 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
import homeassistant.util.dt as dt_util
from . import mock_asyncio_subprocess_run
from tests.common import async_fire_time_changed
async def test_no_poll_when_cover_has_no_command_state(hass: HomeAssistant) -> None:
"""Test that the cover does not polls when there's no state command."""
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"50\n",
) as check_output:
with mock_asyncio_subprocess_run(b"50\n") as mock_subprocess_run:
assert await setup.async_setup_component(
hass,
COVER_DOMAIN,
@ -51,7 +50,7 @@ async def test_no_poll_when_cover_has_no_command_state(hass: HomeAssistant) -> N
)
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL)
await hass.async_block_till_done()
assert not check_output.called
assert not mock_subprocess_run.called
@pytest.mark.parametrize(
@ -74,17 +73,13 @@ async def test_poll_when_cover_has_command_state(
) -> None:
"""Test that the cover polls when there's a state command."""
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"50\n",
) as check_output:
with mock_asyncio_subprocess_run(b"50\n") as mock_subprocess_run:
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL)
await hass.async_block_till_done()
check_output.assert_called_once_with(
mock_subprocess_run.assert_called_once_with(
"echo state",
shell=True, # noqa: S604 # shell by design
timeout=15,
close_fds=False,
stdout=-1,
)
@ -379,10 +374,7 @@ async def test_availability(
hass.states.async_set("sensor.input1", "off")
await hass.async_block_till_done()
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"50\n",
):
with mock_asyncio_subprocess_run(b"50\n"):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()

View File

@ -3,7 +3,6 @@ from __future__ import annotations
import asyncio
from datetime import timedelta
import subprocess
from typing import Any
from unittest.mock import patch
@ -22,6 +21,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util
from . import mock_asyncio_subprocess_run
from tests.common import async_fire_time_changed
@ -132,10 +133,7 @@ async def test_template_render_with_quote(hass: HomeAssistant) -> None:
)
await hass.async_block_till_done()
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"Works\n",
) as check_output:
with mock_asyncio_subprocess_run(b"Works\n") as mock_subprocess_run:
# Give time for template to load
async_fire_time_changed(
hass,
@ -143,11 +141,10 @@ async def test_template_render_with_quote(hass: HomeAssistant) -> None:
)
await hass.async_block_till_done()
assert len(check_output.mock_calls) == 1
check_output.assert_called_with(
assert len(mock_subprocess_run.mock_calls) == 1
mock_subprocess_run.assert_called_with(
'echo "sensor_value" "3 4"',
shell=True, # noqa: S604 # shell by design
timeout=15,
stdout=-1,
close_fds=False,
)
@ -679,10 +676,7 @@ async def test_template_not_error_when_data_is_none(
) -> None:
"""Test command sensor with template not logging error when data is None."""
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
side_effect=subprocess.CalledProcessError,
):
with mock_asyncio_subprocess_run(returncode=1):
await setup.async_setup_component(
hass,
DOMAIN,
@ -747,10 +741,7 @@ async def test_availability(
hass.states.async_set("sensor.input1", "off")
await hass.async_block_till_done()
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"January 17, 2022",
):
with mock_asyncio_subprocess_run(b"January 17, 2022"):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()

View File

@ -5,7 +5,6 @@ import asyncio
from datetime import timedelta
import json
import os
import subprocess
import tempfile
from unittest.mock import patch
@ -32,6 +31,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
import homeassistant.util.dt as dt_util
from . import mock_asyncio_subprocess_run
from tests.common import async_fire_time_changed
@ -374,13 +375,7 @@ async def test_switch_command_state_code_exceptions(
) -> None:
"""Test that switch state code exceptions are handled correctly."""
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
side_effect=[
subprocess.TimeoutExpired("cmd", 10),
subprocess.SubprocessError(),
],
) as check_output:
with mock_asyncio_subprocess_run(exception=asyncio.TimeoutError) as run:
await setup.async_setup_component(
hass,
DOMAIN,
@ -401,12 +396,13 @@ async def test_switch_command_state_code_exceptions(
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL)
await hass.async_block_till_done()
assert check_output.called
assert run.called
assert "Timeout for command" in caplog.text
with mock_asyncio_subprocess_run(returncode=127) as run:
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL * 2)
await hass.async_block_till_done()
assert check_output.called
assert run.called
assert "Error trying to exec command" in caplog.text
@ -415,13 +411,7 @@ async def test_switch_command_state_value_exceptions(
) -> None:
"""Test that switch state value exceptions are handled correctly."""
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
side_effect=[
subprocess.TimeoutExpired("cmd", 10),
subprocess.SubprocessError(),
],
) as check_output:
with mock_asyncio_subprocess_run(exception=asyncio.TimeoutError) as run:
await setup.async_setup_component(
hass,
DOMAIN,
@ -443,13 +433,14 @@ async def test_switch_command_state_value_exceptions(
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL)
await hass.async_block_till_done()
assert check_output.call_count == 1
assert run.call_count == 1
assert "Timeout for command" in caplog.text
with mock_asyncio_subprocess_run(returncode=127) as run:
async_fire_time_changed(hass, dt_util.utcnow() + SCAN_INTERVAL * 2)
await hass.async_block_till_done()
assert check_output.call_count == 2
assert "Error trying to exec command" in caplog.text
assert run.call_count == 1
assert "Command failed (with return code 127)" in caplog.text
async def test_unique_id(
@ -750,10 +741,7 @@ async def test_availability(
hass.states.async_set("sensor.input1", "off")
await hass.async_block_till_done()
with patch(
"homeassistant.components.command_line.utils.subprocess.check_output",
return_value=b"50\n",
):
with mock_asyncio_subprocess_run(b"50\n"):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()