From 219cee2ca9f6cd9eb7e0abcbda6d9540240e20d3 Mon Sep 17 00:00:00 2001 From: epenet <6771947+epenet@users.noreply.github.com> Date: Wed, 14 Sep 2022 15:23:29 +0200 Subject: [PATCH] Move Trace classes to separate module (#78433) --- homeassistant/components/trace/__init__.py | 167 +----------------- homeassistant/components/trace/models.py | 170 +++++++++++++++++++ tests/components/trace/test_websocket_api.py | 6 +- 3 files changed, 175 insertions(+), 168 deletions(-) create mode 100644 homeassistant/components/trace/models.py diff --git a/homeassistant/components/trace/__init__.py b/homeassistant/components/trace/__init__.py index e6a10746a38..bb4f046a7e2 100644 --- a/homeassistant/components/trace/__init__.py +++ b/homeassistant/components/trace/__init__.py @@ -1,30 +1,17 @@ """Support for script and automation tracing and debugging.""" from __future__ import annotations -import abc -from collections import deque -import datetime as dt import logging -from typing import Any import voluptuous as vol from homeassistant.const import EVENT_HOMEASSISTANT_STOP -from homeassistant.core import Context, HomeAssistant +from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError import homeassistant.helpers.config_validation as cv from homeassistant.helpers.json import ExtendedJSONEncoder from homeassistant.helpers.storage import Store -from homeassistant.helpers.trace import ( - TraceElement, - script_execution_get, - trace_id_get, - trace_id_set, - trace_set_child_id, -) from homeassistant.helpers.typing import ConfigType -import homeassistant.util.dt as dt_util -import homeassistant.util.uuid as uuid_util from . import websocket_api from .const import ( @@ -34,6 +21,7 @@ from .const import ( DATA_TRACES_RESTORED, DEFAULT_STORED_TRACES, ) +from .models import ActionTrace, BaseTrace, RestoredTrace # noqa: F401 from .utils import LimitedSizeDict _LOGGER = logging.getLogger(__name__) @@ -187,154 +175,3 @@ async def async_restore_traces(hass: HomeAssistant) -> None: _LOGGER.exception("Failed to restore trace") continue _async_store_restored_trace(hass, trace) - - -class BaseTrace(abc.ABC): - """Base container for a script or automation trace.""" - - context: Context - key: str - - def as_dict(self) -> dict[str, Any]: - """Return an dictionary version of this ActionTrace for saving.""" - return { - "extended_dict": self.as_extended_dict(), - "short_dict": self.as_short_dict(), - } - - @abc.abstractmethod - def as_extended_dict(self) -> dict[str, Any]: - """Return an extended dictionary version of this ActionTrace.""" - - @abc.abstractmethod - def as_short_dict(self) -> dict[str, Any]: - """Return a brief dictionary version of this ActionTrace.""" - - -class ActionTrace(BaseTrace): - """Base container for a script or automation trace.""" - - _domain: str | None = None - - def __init__( - self, - item_id: str | None, - config: dict[str, Any] | None, - blueprint_inputs: dict[str, Any] | None, - context: Context, - ) -> None: - """Container for script trace.""" - self._trace: dict[str, deque[TraceElement]] | None = None - self._config = config - self._blueprint_inputs = blueprint_inputs - self.context: Context = context - self._error: Exception | None = None - self._state: str = "running" - self._script_execution: str | None = None - self.run_id: str = uuid_util.random_uuid_hex() - self._timestamp_finish: dt.datetime | None = None - self._timestamp_start: dt.datetime = dt_util.utcnow() - self.key = f"{self._domain}.{item_id}" - self._dict: dict[str, Any] | None = None - self._short_dict: dict[str, Any] | None = None - if trace_id_get(): - trace_set_child_id(self.key, self.run_id) - trace_id_set((self.key, self.run_id)) - - def set_trace(self, trace: dict[str, deque[TraceElement]] | None) -> None: - """Set action trace.""" - self._trace = trace - - def set_error(self, ex: Exception) -> None: - """Set error.""" - self._error = ex - - def finished(self) -> None: - """Set finish time.""" - self._timestamp_finish = dt_util.utcnow() - self._state = "stopped" - self._script_execution = script_execution_get() - - def as_extended_dict(self) -> dict[str, Any]: - """Return an extended dictionary version of this ActionTrace.""" - if self._dict: - return self._dict - - result = dict(self.as_short_dict()) - - traces = {} - if self._trace: - for key, trace_list in self._trace.items(): - traces[key] = [item.as_dict() for item in trace_list] - - result.update( - { - "trace": traces, - "config": self._config, - "blueprint_inputs": self._blueprint_inputs, - "context": self.context, - } - ) - - if self._state == "stopped": - # Execution has stopped, save the result - self._dict = result - return result - - def as_short_dict(self) -> dict[str, Any]: - """Return a brief dictionary version of this ActionTrace.""" - if self._short_dict: - return self._short_dict - - last_step = None - - if self._trace: - last_step = list(self._trace)[-1] - domain, item_id = self.key.split(".", 1) - - result = { - "last_step": last_step, - "run_id": self.run_id, - "state": self._state, - "script_execution": self._script_execution, - "timestamp": { - "start": self._timestamp_start, - "finish": self._timestamp_finish, - }, - "domain": domain, - "item_id": item_id, - } - if self._error is not None: - result["error"] = str(self._error) - - if self._state == "stopped": - # Execution has stopped, save the result - self._short_dict = result - return result - - -class RestoredTrace(BaseTrace): - """Container for a restored script or automation trace.""" - - def __init__(self, data: dict[str, Any]) -> None: - """Restore from dict.""" - extended_dict = data["extended_dict"] - short_dict = data["short_dict"] - context = Context( - user_id=extended_dict["context"]["user_id"], - parent_id=extended_dict["context"]["parent_id"], - id=extended_dict["context"]["id"], - ) - self.context = context - self.key = f"{extended_dict['domain']}.{extended_dict['item_id']}" - self.run_id = extended_dict["run_id"] - self._dict = extended_dict - self._short_dict = short_dict - - def as_extended_dict(self) -> dict[str, Any]: - """Return an extended dictionary version of this RestoredTrace.""" - return self._dict - - def as_short_dict(self) -> dict[str, Any]: - """Return a brief dictionary version of this RestoredTrace.""" - return self._short_dict diff --git a/homeassistant/components/trace/models.py b/homeassistant/components/trace/models.py new file mode 100644 index 00000000000..9530554449e --- /dev/null +++ b/homeassistant/components/trace/models.py @@ -0,0 +1,170 @@ +"""Containers for a script or automation trace.""" +from __future__ import annotations + +import abc +from collections import deque +import datetime as dt +from typing import Any + +from homeassistant.core import Context +from homeassistant.helpers.trace import ( + TraceElement, + script_execution_get, + trace_id_get, + trace_id_set, + trace_set_child_id, +) +import homeassistant.util.dt as dt_util +import homeassistant.util.uuid as uuid_util + + +class BaseTrace(abc.ABC): + """Base container for a script or automation trace.""" + + context: Context + key: str + run_id: str + + def as_dict(self) -> dict[str, Any]: + """Return an dictionary version of this ActionTrace for saving.""" + return { + "extended_dict": self.as_extended_dict(), + "short_dict": self.as_short_dict(), + } + + @abc.abstractmethod + def as_extended_dict(self) -> dict[str, Any]: + """Return an extended dictionary version of this ActionTrace.""" + + @abc.abstractmethod + def as_short_dict(self) -> dict[str, Any]: + """Return a brief dictionary version of this ActionTrace.""" + + +class ActionTrace(BaseTrace): + """Base container for a script or automation trace.""" + + _domain: str | None = None + + def __init__( + self, + item_id: str | None, + config: dict[str, Any] | None, + blueprint_inputs: dict[str, Any] | None, + context: Context, + ) -> None: + """Container for script trace.""" + self._trace: dict[str, deque[TraceElement]] | None = None + self._config = config + self._blueprint_inputs = blueprint_inputs + self.context: Context = context + self._error: Exception | None = None + self._state: str = "running" + self._script_execution: str | None = None + self.run_id: str = uuid_util.random_uuid_hex() + self._timestamp_finish: dt.datetime | None = None + self._timestamp_start: dt.datetime = dt_util.utcnow() + self.key = f"{self._domain}.{item_id}" + self._dict: dict[str, Any] | None = None + self._short_dict: dict[str, Any] | None = None + if trace_id_get(): + trace_set_child_id(self.key, self.run_id) + trace_id_set((self.key, self.run_id)) + + def set_trace(self, trace: dict[str, deque[TraceElement]] | None) -> None: + """Set action trace.""" + self._trace = trace + + def set_error(self, ex: Exception) -> None: + """Set error.""" + self._error = ex + + def finished(self) -> None: + """Set finish time.""" + self._timestamp_finish = dt_util.utcnow() + self._state = "stopped" + self._script_execution = script_execution_get() + + def as_extended_dict(self) -> dict[str, Any]: + """Return an extended dictionary version of this ActionTrace.""" + if self._dict: + return self._dict + + result = dict(self.as_short_dict()) + + traces = {} + if self._trace: + for key, trace_list in self._trace.items(): + traces[key] = [item.as_dict() for item in trace_list] + + result.update( + { + "trace": traces, + "config": self._config, + "blueprint_inputs": self._blueprint_inputs, + "context": self.context, + } + ) + + if self._state == "stopped": + # Execution has stopped, save the result + self._dict = result + return result + + def as_short_dict(self) -> dict[str, Any]: + """Return a brief dictionary version of this ActionTrace.""" + if self._short_dict: + return self._short_dict + + last_step = None + + if self._trace: + last_step = list(self._trace)[-1] + domain, item_id = self.key.split(".", 1) + + result = { + "last_step": last_step, + "run_id": self.run_id, + "state": self._state, + "script_execution": self._script_execution, + "timestamp": { + "start": self._timestamp_start, + "finish": self._timestamp_finish, + }, + "domain": domain, + "item_id": item_id, + } + if self._error is not None: + result["error"] = str(self._error) + + if self._state == "stopped": + # Execution has stopped, save the result + self._short_dict = result + return result + + +class RestoredTrace(BaseTrace): + """Container for a restored script or automation trace.""" + + def __init__(self, data: dict[str, Any]) -> None: + """Restore from dict.""" + extended_dict = data["extended_dict"] + short_dict = data["short_dict"] + context = Context( + user_id=extended_dict["context"]["user_id"], + parent_id=extended_dict["context"]["parent_id"], + id=extended_dict["context"]["id"], + ) + self.context = context + self.key = f"{extended_dict['domain']}.{extended_dict['item_id']}" + self.run_id = extended_dict["run_id"] + self._dict = extended_dict + self._short_dict = short_dict + + def as_extended_dict(self) -> dict[str, Any]: + """Return an extended dictionary version of this RestoredTrace.""" + return self._dict + + def as_short_dict(self) -> dict[str, Any]: + """Return a brief dictionary version of this RestoredTrace.""" + return self._short_dict diff --git a/tests/components/trace/test_websocket_api.py b/tests/components/trace/test_websocket_api.py index 21eefb14c1b..69d41968f9b 100644 --- a/tests/components/trace/test_websocket_api.py +++ b/tests/components/trace/test_websocket_api.py @@ -554,7 +554,7 @@ async def test_trace_overflow(hass, hass_ws_client, domain, stored_traces): # Trigger "moon" enough times to overflow the max number of stored traces with patch( - "homeassistant.components.trace.uuid_util.random_uuid_hex", + "homeassistant.components.trace.models.uuid_util.random_uuid_hex", wraps=mock_random_uuid_hex, ): for _ in range(stored_traces or DEFAULT_STORED_TRACES): @@ -628,7 +628,7 @@ async def test_restore_traces_overflow( # Trigger "moon" enough times to overflow the max number of stored traces with patch( - "homeassistant.components.trace.uuid_util.random_uuid_hex", + "homeassistant.components.trace.models.uuid_util.random_uuid_hex", wraps=mock_random_uuid_hex, ): for _ in range(DEFAULT_STORED_TRACES - num_restored_moon_traces + 1): @@ -698,7 +698,7 @@ async def test_restore_traces_late_overflow( # Trigger "moon" enough times to overflow the max number of stored traces with patch( - "homeassistant.components.trace.uuid_util.random_uuid_hex", + "homeassistant.components.trace.models.uuid_util.random_uuid_hex", wraps=mock_random_uuid_hex, ): for _ in range(DEFAULT_STORED_TRACES - num_restored_moon_traces + 1):