Refactor YAML loading to retrieve files from local filesystem or via vscode connection

This commit is contained in:
Guillermo Ruffino 2025-04-01 16:24:39 -03:00
parent 28a9f12595
commit cfabba9638
2 changed files with 47 additions and 24 deletions

View File

@ -78,6 +78,15 @@ def _print_file_read_event(path: str) -> None:
) )
def _loader(fname: str):
_print_file_read_event(fname)
raw_yaml_stream = StringIO(_read_file_content_from_json_on_stdin())
# it is required to set the name on StringIO so document on start_mark
# is set properly. Otherwise it is initialized with "<file>"
raw_yaml_stream.name = fname
return parse_yaml(fname, raw_yaml_stream, _loader)
def read_config(args): def read_config(args):
while True: while True:
CORE.reset() CORE.reset()
@ -92,14 +101,12 @@ def read_config(args):
CORE.config_path = data["file"] CORE.config_path = data["file"]
file_name = CORE.config_path file_name = CORE.config_path
_print_file_read_event(file_name)
raw_yaml = _read_file_content_from_json_on_stdin()
command_line_substitutions: dict[str, Any] = ( command_line_substitutions: dict[str, Any] = (
dict(args.substitution) if args.substitution else {} dict(args.substitution) if args.substitution else {}
) )
vs = VSCodeResult() vs = VSCodeResult()
try: try:
config = parse_yaml(file_name, StringIO(raw_yaml)) config = _loader(file_name)
res = validate_config(config, command_line_substitutions) res = validate_config(config, command_line_substitutions)
except Exception as err: # pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
vs.add_yaml_error(str(err)) vs.add_yaml_error(str(err))

View File

@ -102,6 +102,10 @@ def _add_data_ref(fn):
class ESPHomeLoaderMixin: class ESPHomeLoaderMixin:
"""Loader class that keeps track of line numbers.""" """Loader class that keeps track of line numbers."""
def __init__(self, name, yaml_loader):
self.name = name
self.yaml_loader = yaml_loader
@_add_data_ref @_add_data_ref
def construct_yaml_int(self, node): def construct_yaml_int(self, node):
return super().construct_yaml_int(node) return super().construct_yaml_int(node)
@ -252,14 +256,14 @@ class ESPHomeLoaderMixin:
@_add_data_ref @_add_data_ref
def construct_secret(self, node): def construct_secret(self, node):
try: try:
secrets = _load_yaml_internal(self._rel_path(SECRET_YAML)) secrets = self.yaml_loader(self._rel_path(SECRET_YAML))
except EsphomeError as e: except EsphomeError as e:
if self.name == CORE.config_path: if self.name == CORE.config_path:
raise e raise e
try: try:
main_config_dir = os.path.dirname(CORE.config_path) main_config_dir = os.path.dirname(CORE.config_path)
main_secret_yml = os.path.join(main_config_dir, SECRET_YAML) main_secret_yml = os.path.join(main_config_dir, SECRET_YAML)
secrets = _load_yaml_internal(main_secret_yml) secrets = self.yaml_loader(main_secret_yml)
except EsphomeError as er: except EsphomeError as er:
raise EsphomeError(f"{e}\n{er}") from er raise EsphomeError(f"{e}\n{er}") from er
@ -290,7 +294,7 @@ class ESPHomeLoaderMixin:
else: else:
file, vars = node.value, None file, vars = node.value, None
result = _load_yaml_internal(self._rel_path(file)) result = self.yaml_loader(self._rel_path(file))
if not vars: if not vars:
vars = {} vars = {}
result = substitute_vars(result, vars) result = substitute_vars(result, vars)
@ -299,14 +303,14 @@ class ESPHomeLoaderMixin:
@_add_data_ref @_add_data_ref
def construct_include_dir_list(self, node): def construct_include_dir_list(self, node):
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml")) files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
return [_load_yaml_internal(f) for f in files] return [self.yaml_loader(f) for f in files]
@_add_data_ref @_add_data_ref
def construct_include_dir_merge_list(self, node): def construct_include_dir_merge_list(self, node):
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml")) files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
merged_list = [] merged_list = []
for fname in files: for fname in files:
loaded_yaml = _load_yaml_internal(fname) loaded_yaml = self.yaml_loader(fname)
if isinstance(loaded_yaml, list): if isinstance(loaded_yaml, list):
merged_list.extend(loaded_yaml) merged_list.extend(loaded_yaml)
return merged_list return merged_list
@ -317,7 +321,7 @@ class ESPHomeLoaderMixin:
mapping = OrderedDict() mapping = OrderedDict()
for fname in files: for fname in files:
filename = os.path.splitext(os.path.basename(fname))[0] filename = os.path.splitext(os.path.basename(fname))[0]
mapping[filename] = _load_yaml_internal(fname) mapping[filename] = self.yaml_loader(fname)
return mapping return mapping
@_add_data_ref @_add_data_ref
@ -325,7 +329,7 @@ class ESPHomeLoaderMixin:
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml")) files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
mapping = OrderedDict() mapping = OrderedDict()
for fname in files: for fname in files:
loaded_yaml = _load_yaml_internal(fname) loaded_yaml = self.yaml_loader(fname)
if isinstance(loaded_yaml, dict): if isinstance(loaded_yaml, dict):
mapping.update(loaded_yaml) mapping.update(loaded_yaml)
return mapping return mapping
@ -351,10 +355,18 @@ class ESPHomeLoaderMixin:
class ESPHomeLoader(ESPHomeLoaderMixin, FastestAvailableSafeLoader): class ESPHomeLoader(ESPHomeLoaderMixin, FastestAvailableSafeLoader):
"""Loader class that keeps track of line numbers.""" """Loader class that keeps track of line numbers."""
def __init__(self, stream, name, yaml_loader):
FastestAvailableSafeLoader.__init__(self, stream)
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
class ESPHomePurePythonLoader(ESPHomeLoaderMixin, PurePythonLoader): class ESPHomePurePythonLoader(ESPHomeLoaderMixin, PurePythonLoader):
"""Loader class that keeps track of line numbers.""" """Loader class that keeps track of line numbers."""
def __init__(self, stream, name, yaml_loader):
PurePythonLoader.__init__(self, stream)
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
for _loader in (ESPHomeLoader, ESPHomePurePythonLoader): for _loader in (ESPHomeLoader, ESPHomePurePythonLoader):
_loader.add_constructor("tag:yaml.org,2002:int", _loader.construct_yaml_int) _loader.add_constructor("tag:yaml.org,2002:int", _loader.construct_yaml_int)
@ -388,17 +400,30 @@ def load_yaml(fname: str, clear_secrets: bool = True) -> Any:
return _load_yaml_internal(fname) return _load_yaml_internal(fname)
def parse_yaml(file_name: str, file_handle: TextIOWrapper) -> Any: def _load_yaml_internal(fname: str) -> Any:
"""Load a YAML file."""
try:
with open(fname, encoding="utf-8") as f_handle:
return parse_yaml(fname, f_handle)
except (UnicodeDecodeError, OSError) as err:
raise EsphomeError(f"Error reading file {fname}: {err}") from err
def parse_yaml(
file_name: str, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal
) -> Any:
"""Parse a YAML file.""" """Parse a YAML file."""
try: try:
return _load_yaml_internal_with_type(ESPHomeLoader, file_name, file_handle) return _load_yaml_internal_with_type(
ESPHomeLoader, file_name, file_handle, yaml_loader
)
except EsphomeError: except EsphomeError:
# Loading failed, so we now load with the Python loader which has more # Loading failed, so we now load with the Python loader which has more
# readable exceptions # readable exceptions
# Rewind the stream so we can try again # Rewind the stream so we can try again
file_handle.seek(0, 0) file_handle.seek(0, 0)
return _load_yaml_internal_with_type( return _load_yaml_internal_with_type(
ESPHomePurePythonLoader, file_name, file_handle ESPHomePurePythonLoader, file_name, file_handle, yaml_loader
) )
@ -435,23 +460,14 @@ def substitute_vars(config, vars):
return result return result
def _load_yaml_internal(fname: str) -> Any:
"""Load a YAML file."""
try:
with open(fname, encoding="utf-8") as f_handle:
return parse_yaml(fname, f_handle)
except (UnicodeDecodeError, OSError) as err:
raise EsphomeError(f"Error reading file {fname}: {err}") from err
def _load_yaml_internal_with_type( def _load_yaml_internal_with_type(
loader_type: type[ESPHomeLoader] | type[ESPHomePurePythonLoader], loader_type: type[ESPHomeLoader] | type[ESPHomePurePythonLoader],
fname: str, fname: str,
content: TextIOWrapper, content: TextIOWrapper,
yaml_loader: Any,
) -> Any: ) -> Any:
"""Load a YAML file.""" """Load a YAML file."""
loader = loader_type(content) loader = loader_type(content, fname, yaml_loader)
loader.name = fname
try: try:
return loader.get_single_data() or OrderedDict() return loader.get_single_data() or OrderedDict()
except yaml.YAMLError as exc: except yaml.YAMLError as exc: