Improve reading clarity of steps code in scripts helper (#134395)

* Reorganize steps code in scripts helper

* Address feedback

* Revert to getattr
This commit is contained in:
Artur Pragacz 2025-02-19 19:37:36 +01:00 committed by GitHub
parent d2ce89882b
commit 7117708937
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -430,9 +430,6 @@ class _ScriptRun:
if not self._stop.done():
self._script._changed() # noqa: SLF001
async def _async_get_condition(self, config: ConfigType) -> ConditionCheckerType:
return await self._script._async_get_condition(config) # noqa: SLF001
def _log(
self, msg: str, *args: Any, level: int = logging.INFO, **kwargs: Any
) -> None:
@ -521,7 +518,7 @@ class _ScriptRun:
trace_set_result(enabled=False)
return
handler = f"_async_{action}_step"
handler = f"_async_step_{action}"
try:
await getattr(self, handler)()
except Exception as ex: # noqa: BLE001
@ -627,111 +624,49 @@ class _ScriptRun:
except ScriptStoppedError as ex:
raise asyncio.CancelledError from ex
async def _async_call_service_step(self) -> None:
"""Call the service specified in the action."""
self._step_log("call service")
params = service.async_prepare_call_from_config(
self._hass, self._action, self._variables
)
# Validate response data parameters. This check ignores services that do
# not exist which will raise an appropriate error in the service call below.
response_variable = self._action.get(CONF_RESPONSE_VARIABLE)
return_response = response_variable is not None
if self._hass.services.has_service(params[CONF_DOMAIN], params[CONF_SERVICE]):
supports_response = self._hass.services.supports_response(
params[CONF_DOMAIN], params[CONF_SERVICE]
)
if supports_response == SupportsResponse.ONLY and not return_response:
raise vol.Invalid(
f"Script requires '{CONF_RESPONSE_VARIABLE}' for response data "
f"for service call {params[CONF_DOMAIN]}.{params[CONF_SERVICE]}"
)
if supports_response == SupportsResponse.NONE and return_response:
raise vol.Invalid(
f"Script does not support '{CONF_RESPONSE_VARIABLE}' for service "
f"'{CONF_RESPONSE_VARIABLE}' which does not support response data."
)
running_script = (
params[CONF_DOMAIN] == "automation" and params[CONF_SERVICE] == "trigger"
) or params[CONF_DOMAIN] in ("python_script", "script")
trace_set_result(params=params, running_script=running_script)
response_data = await self._async_run_long_action(
async def _async_run_script(self, script: Script) -> None:
"""Execute a script."""
result = await self._async_run_long_action(
self._hass.async_create_task_internal(
self._hass.services.async_call(
**params,
blocking=True,
context=self._context,
return_response=return_response,
),
eager_start=True,
script.async_run(self._variables, self._context), eager_start=True
)
)
if response_variable:
self._variables[response_variable] = response_data
if result and result.conversation_response is not UNDEFINED:
self._conversation_response = result.conversation_response
async def _async_device_step(self) -> None:
"""Perform the device automation specified in the action."""
self._step_log("device automation")
await device_action.async_call_action_from_config(
self._hass, self._action, self._variables, self._context
## Flow control actions ##
### Sequence actions ###
@async_trace_path("parallel")
async def _async_step_parallel(self) -> None:
"""Run a sequence in parallel."""
scripts = await self._script._async_get_parallel_scripts(self._step) # noqa: SLF001
async def async_run_with_trace(idx: int, script: Script) -> None:
"""Run a script with a trace path."""
trace_path_stack_cv.set(copy(trace_path_stack_cv.get()))
with trace_path([str(idx), "sequence"]):
await self._async_run_script(script)
results = await asyncio.gather(
*(async_run_with_trace(idx, script) for idx, script in enumerate(scripts)),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
raise result
async def _async_scene_step(self) -> None:
"""Activate the scene specified in the action."""
self._step_log("activate scene")
trace_set_result(scene=self._action[CONF_SCENE])
await self._hass.services.async_call(
scene.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: self._action[CONF_SCENE]},
blocking=True,
context=self._context,
)
@async_trace_path("sequence")
async def _async_step_sequence(self) -> None:
"""Run a sequence."""
sequence = await self._script._async_get_sequence_script(self._step) # noqa: SLF001
await self._async_run_script(sequence)
async def _async_event_step(self) -> None:
"""Fire an event."""
self._step_log(self._action.get(CONF_ALIAS, self._action[CONF_EVENT]))
event_data = {}
for conf in (CONF_EVENT_DATA, CONF_EVENT_DATA_TEMPLATE):
if conf not in self._action:
continue
### Condition actions ###
try:
event_data.update(
template.render_complex(self._action[conf], self._variables)
)
except exceptions.TemplateError as ex:
self._log(
"Error rendering event data template: %s", ex, level=logging.ERROR
)
trace_set_result(event=self._action[CONF_EVENT], event_data=event_data)
self._hass.bus.async_fire_internal(
self._action[CONF_EVENT], event_data, context=self._context
)
async def _async_condition_step(self) -> None:
"""Test if condition is matching."""
self._script.last_action = self._action.get(
CONF_ALIAS, self._action[CONF_CONDITION]
)
cond = await self._async_get_condition(self._action)
try:
trace_element = trace_stack_top(trace_stack_cv)
if trace_element:
trace_element.reuse_by_child = True
check = cond(self._hass, self._variables)
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'condition' evaluation:\n%s", ex)
check = False
self._log("Test condition %s: %s", self._script.last_action, check)
trace_update_result(result=check)
if not check:
raise _ConditionFail
async def _async_get_condition(self, config: ConfigType) -> ConditionCheckerType:
return await self._script._async_get_condition(config) # noqa: SLF001
def _test_conditions(
self,
@ -760,8 +695,73 @@ class _ScriptRun:
return traced_test_conditions(self._hass, self._variables)
async def _async_step_choose(self) -> None:
"""Choose a sequence."""
choose_data = await self._script._async_get_choose_data(self._step) # noqa: SLF001
with trace_path("choose"):
for idx, (conditions, script) in enumerate(choose_data["choices"]):
with trace_path(str(idx)):
try:
if self._test_conditions(conditions, "choose", "conditions"):
trace_set_result(choice=idx)
with trace_path("sequence"):
await self._async_run_script(script)
return
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'choose' evaluation:\n%s", ex)
if choose_data["default"] is not None:
trace_set_result(choice="default")
with trace_path(["default"]):
await self._async_run_script(choose_data["default"])
async def _async_step_condition(self) -> None:
"""Test if condition is matching."""
self._script.last_action = self._action.get(
CONF_ALIAS, self._action[CONF_CONDITION]
)
cond = await self._async_get_condition(self._action)
try:
trace_element = trace_stack_top(trace_stack_cv)
if trace_element:
trace_element.reuse_by_child = True
check = cond(self._hass, self._variables)
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'condition' evaluation:\n%s", ex)
check = False
self._log("Test condition %s: %s", self._script.last_action, check)
trace_update_result(result=check)
if not check:
raise _ConditionFail
async def _async_step_if(self) -> None:
"""If sequence."""
if_data = await self._script._async_get_if_data(self._step) # noqa: SLF001
test_conditions: bool | None = False
try:
with trace_path("if"):
test_conditions = self._test_conditions(
if_data["if_conditions"], "if", "condition"
)
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'if' evaluation:\n%s", ex)
if test_conditions:
trace_set_result(choice="then")
with trace_path("then"):
await self._async_run_script(if_data["if_then"])
return
if if_data["if_else"] is not None:
trace_set_result(choice="else")
with trace_path("else"):
await self._async_run_script(if_data["if_else"])
@async_trace_path("repeat")
async def _async_repeat_step(self) -> None: # noqa: C901
async def _async_step_repeat(self) -> None: # noqa: C901
"""Repeat a sequence."""
description = self._action.get(CONF_ALIAS, "sequence")
repeat = self._action[CONF_REPEAT]
@ -932,52 +932,128 @@ class _ScriptRun:
else:
self._variables.pop("repeat", None) # Not set if count = 0
async def _async_choose_step(self) -> None:
"""Choose a sequence."""
choose_data = await self._script._async_get_choose_data(self._step) # noqa: SLF001
### Stop actions ###
with trace_path("choose"):
for idx, (conditions, script) in enumerate(choose_data["choices"]):
with trace_path(str(idx)):
try:
if self._test_conditions(conditions, "choose", "conditions"):
trace_set_result(choice=idx)
with trace_path("sequence"):
await self._async_run_script(script)
return
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'choose' evaluation:\n%s", ex)
async def _async_step_stop(self) -> None:
"""Stop script execution."""
stop = self._action[CONF_STOP]
error = self._action.get(CONF_ERROR, False)
trace_set_result(stop=stop, error=error)
if error:
self._log("Error script sequence: %s", stop)
raise _AbortScript(stop)
if choose_data["default"] is not None:
trace_set_result(choice="default")
with trace_path(["default"]):
await self._async_run_script(choose_data["default"])
self._log("Stop script sequence: %s", stop)
if CONF_RESPONSE_VARIABLE in self._action:
try:
response = self._variables[self._action[CONF_RESPONSE_VARIABLE]]
except KeyError as ex:
raise _AbortScript(
f"Response variable '{self._action[CONF_RESPONSE_VARIABLE]}' "
"is not defined"
) from ex
else:
response = None
raise _StopScript(stop, response)
async def _async_if_step(self) -> None:
"""If sequence."""
if_data = await self._script._async_get_if_data(self._step) # noqa: SLF001
## Variable actions ##
test_conditions: bool | None = False
try:
with trace_path("if"):
test_conditions = self._test_conditions(
if_data["if_conditions"], "if", "condition"
async def _async_step_variables(self) -> None:
"""Set a variable value."""
self._step_log("setting variables")
self._variables = self._action[CONF_VARIABLES].async_render(
self._hass, self._variables, render_as_defaults=False
)
## External actions ##
async def _async_step_call_service(self) -> None:
"""Call the service specified in the action."""
self._step_log("call service")
params = service.async_prepare_call_from_config(
self._hass, self._action, self._variables
)
# Validate response data parameters. This check ignores services that do
# not exist which will raise an appropriate error in the service call below.
response_variable = self._action.get(CONF_RESPONSE_VARIABLE)
return_response = response_variable is not None
if self._hass.services.has_service(params[CONF_DOMAIN], params[CONF_SERVICE]):
supports_response = self._hass.services.supports_response(
params[CONF_DOMAIN], params[CONF_SERVICE]
)
if supports_response == SupportsResponse.ONLY and not return_response:
raise vol.Invalid(
f"Script requires '{CONF_RESPONSE_VARIABLE}' for response data "
f"for service call {params[CONF_DOMAIN]}.{params[CONF_SERVICE]}"
)
if supports_response == SupportsResponse.NONE and return_response:
raise vol.Invalid(
f"Script does not support '{CONF_RESPONSE_VARIABLE}' for service "
f"'{CONF_RESPONSE_VARIABLE}' which does not support response data."
)
except exceptions.ConditionError as ex:
_LOGGER.warning("Error in 'if' evaluation:\n%s", ex)
if test_conditions:
trace_set_result(choice="then")
with trace_path("then"):
await self._async_run_script(if_data["if_then"])
return
running_script = (
params[CONF_DOMAIN] == "automation" and params[CONF_SERVICE] == "trigger"
) or params[CONF_DOMAIN] in ("python_script", "script")
trace_set_result(params=params, running_script=running_script)
response_data = await self._async_run_long_action(
self._hass.async_create_task_internal(
self._hass.services.async_call(
**params,
blocking=True,
context=self._context,
return_response=return_response,
),
eager_start=True,
)
)
if response_variable:
self._variables[response_variable] = response_data
if if_data["if_else"] is not None:
trace_set_result(choice="else")
with trace_path("else"):
await self._async_run_script(if_data["if_else"])
async def _async_step_device(self) -> None:
"""Perform the device automation specified in the action."""
self._step_log("device automation")
await device_action.async_call_action_from_config(
self._hass, self._action, self._variables, self._context
)
## Time-based steps ##
async def _async_step_event(self) -> None:
"""Fire an event."""
self._step_log(self._action.get(CONF_ALIAS, self._action[CONF_EVENT]))
event_data = {}
for conf in (CONF_EVENT_DATA, CONF_EVENT_DATA_TEMPLATE):
if conf not in self._action:
continue
try:
event_data.update(
template.render_complex(self._action[conf], self._variables)
)
except exceptions.TemplateError as ex:
self._log(
"Error rendering event data template: %s", ex, level=logging.ERROR
)
trace_set_result(event=self._action[CONF_EVENT], event_data=event_data)
self._hass.bus.async_fire_internal(
self._action[CONF_EVENT], event_data, context=self._context
)
async def _async_step_scene(self) -> None:
"""Activate the scene specified in the action."""
self._step_log("activate scene")
trace_set_result(scene=self._action[CONF_SCENE])
await self._hass.services.async_call(
scene.DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: self._action[CONF_SCENE]},
blocking=True,
context=self._context,
)
## Time-based actions ##
@overload
def _async_futures_with_timeout(
@ -1040,7 +1116,7 @@ class _ScriptRun:
)
raise _AbortScript from ex
async def _async_delay_step(self) -> None:
async def _async_step_delay(self) -> None:
"""Handle delay."""
delay_delta = self._get_pos_time_period_template(CONF_DELAY)
@ -1107,7 +1183,7 @@ class _ScriptRun:
else:
wait_var["remaining"] = None
async def _async_wait_for_trigger_step(self) -> None:
async def _async_step_wait_for_trigger(self) -> None:
"""Wait for a trigger event."""
timeout = self._get_timeout_seconds_from_action()
@ -1159,7 +1235,7 @@ class _ScriptRun:
futures, timeout_handle, timeout_future, remove_triggers
)
async def _async_wait_template_step(self) -> None:
async def _async_step_wait_template(self) -> None:
"""Handle a wait template."""
timeout = self._get_timeout_seconds_from_action()
self._step_log("wait template", timeout)
@ -1203,14 +1279,9 @@ class _ScriptRun:
futures, timeout_handle, timeout_future, unsub
)
async def _async_variables_step(self) -> None:
"""Set a variable value."""
self._step_log("setting variables")
self._variables = self._action[CONF_VARIABLES].async_render(
self._hass, self._variables, render_as_defaults=False
)
## Conversation actions ##
async def _async_set_conversation_response_step(self) -> None:
async def _async_step_set_conversation_response(self) -> None:
"""Set conversation response."""
self._step_log("setting conversation response")
resp: template.Template | None = self._action[CONF_SET_CONVERSATION_RESPONSE]
@ -1222,63 +1293,6 @@ class _ScriptRun:
)
trace_set_result(conversation_response=self._conversation_response)
async def _async_stop_step(self) -> None:
"""Stop script execution."""
stop = self._action[CONF_STOP]
error = self._action.get(CONF_ERROR, False)
trace_set_result(stop=stop, error=error)
if error:
self._log("Error script sequence: %s", stop)
raise _AbortScript(stop)
self._log("Stop script sequence: %s", stop)
if CONF_RESPONSE_VARIABLE in self._action:
try:
response = self._variables[self._action[CONF_RESPONSE_VARIABLE]]
except KeyError as ex:
raise _AbortScript(
f"Response variable '{self._action[CONF_RESPONSE_VARIABLE]}' "
"is not defined"
) from ex
else:
response = None
raise _StopScript(stop, response)
@async_trace_path("sequence")
async def _async_sequence_step(self) -> None:
"""Run a sequence."""
sequence = await self._script._async_get_sequence_script(self._step) # noqa: SLF001
await self._async_run_script(sequence)
@async_trace_path("parallel")
async def _async_parallel_step(self) -> None:
"""Run a sequence in parallel."""
scripts = await self._script._async_get_parallel_scripts(self._step) # noqa: SLF001
async def async_run_with_trace(idx: int, script: Script) -> None:
"""Run a script with a trace path."""
trace_path_stack_cv.set(copy(trace_path_stack_cv.get()))
with trace_path([str(idx), "sequence"]):
await self._async_run_script(script)
results = await asyncio.gather(
*(async_run_with_trace(idx, script) for idx, script in enumerate(scripts)),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
raise result
async def _async_run_script(self, script: Script) -> None:
"""Execute a script."""
result = await self._async_run_long_action(
self._hass.async_create_task_internal(
script.async_run(self._variables, self._context), eager_start=True
)
)
if result and result.conversation_response is not UNDEFINED:
self._conversation_response = result.conversation_response
class _QueuedScriptRun(_ScriptRun):
"""Manage queued Script sequence run."""