mirror of
https://github.com/home-assistant/core.git
synced 2025-04-19 14:57:52 +00:00
Split out yaml loading into own package (#140683)
* Split out yaml loading into library * Code review * Code review * Fix check config script
This commit is contained in:
parent
3a6ddcf428
commit
7b9ea63f17
@ -8,6 +8,7 @@ import os
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, Self
|
||||
|
||||
from annotatedyaml import loader as yaml_loader
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import loader
|
||||
@ -29,7 +30,6 @@ from homeassistant.requirements import (
|
||||
async_clear_install_history,
|
||||
async_get_integration_with_requirements,
|
||||
)
|
||||
from homeassistant.util.yaml import loader as yaml_loader
|
||||
|
||||
from . import config_validation as cv
|
||||
from .typing import ConfigType
|
||||
|
@ -10,6 +10,7 @@ aiohttp==3.11.13
|
||||
aiohttp_cors==0.7.0
|
||||
aiousbwatcher==1.1.1
|
||||
aiozoneinfo==0.2.3
|
||||
annotatedyaml==0.1.1
|
||||
astral==2.2
|
||||
async-interrupt==1.2.2
|
||||
async-upnp-client==0.43.0
|
||||
|
@ -12,6 +12,9 @@ import os
|
||||
from typing import Any
|
||||
from unittest.mock import patch
|
||||
|
||||
from annotatedyaml import loader as yaml_loader
|
||||
from annotatedyaml.loader import Secrets
|
||||
|
||||
from homeassistant import core, loader
|
||||
from homeassistant.config import get_default_config_dir
|
||||
from homeassistant.config_entries import ConfigEntries
|
||||
@ -23,7 +26,6 @@ from homeassistant.helpers import (
|
||||
issue_registry as ir,
|
||||
)
|
||||
from homeassistant.helpers.check_config import async_check_ha_config_file
|
||||
from homeassistant.util.yaml import Secrets, loader as yaml_loader
|
||||
|
||||
# mypy: allow-untyped-calls, allow-untyped-defs
|
||||
|
||||
@ -31,9 +33,9 @@ REQUIREMENTS = ("colorlog==6.8.2",)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
MOCKS: dict[str, tuple[str, Callable]] = {
|
||||
"load": ("homeassistant.util.yaml.loader.load_yaml", yaml_loader.load_yaml),
|
||||
"load": ("annotatedyaml.loader.load_yaml", yaml_loader.load_yaml),
|
||||
"load*": ("homeassistant.config.load_yaml_dict", yaml_loader.load_yaml_dict),
|
||||
"secrets": ("homeassistant.util.yaml.loader.secret_yaml", yaml_loader.secret_yaml),
|
||||
"secrets": ("annotatedyaml.loader.secret_yaml", yaml_loader.secret_yaml),
|
||||
}
|
||||
|
||||
PATCHES: dict[str, Any] = {}
|
||||
|
@ -1,17 +1,11 @@
|
||||
"""YAML utility functions."""
|
||||
|
||||
from .const import SECRET_YAML
|
||||
from annotatedyaml import SECRET_YAML, YamlTypeError
|
||||
from annotatedyaml.input import UndefinedSubstitution, extract_inputs, substitute
|
||||
from annotatedyaml.objects import Input
|
||||
|
||||
from .dumper import dump, save_yaml
|
||||
from .input import UndefinedSubstitution, extract_inputs, substitute
|
||||
from .loader import (
|
||||
Secrets,
|
||||
YamlTypeError,
|
||||
load_yaml,
|
||||
load_yaml_dict,
|
||||
parse_yaml,
|
||||
secret_yaml,
|
||||
)
|
||||
from .objects import Input
|
||||
from .loader import Secrets, load_yaml, load_yaml_dict, parse_yaml, secret_yaml
|
||||
|
||||
__all__ = [
|
||||
"SECRET_YAML",
|
||||
|
@ -1,3 +0,0 @@
|
||||
"""Constants."""
|
||||
|
||||
SECRET_YAML = "secrets.yaml"
|
@ -1,96 +1,5 @@
|
||||
"""Custom dumper and representers."""
|
||||
|
||||
from collections import OrderedDict
|
||||
from typing import Any
|
||||
from annotatedyaml.dumper import add_representer, dump, represent_odict, save_yaml
|
||||
|
||||
import yaml
|
||||
|
||||
from .objects import Input, NodeDictClass, NodeListClass, NodeStrClass
|
||||
|
||||
# mypy: allow-untyped-calls, no-warn-return-any
|
||||
|
||||
|
||||
try:
|
||||
from yaml import CSafeDumper as FastestAvailableSafeDumper
|
||||
except ImportError:
|
||||
from yaml import ( # type: ignore[assignment]
|
||||
SafeDumper as FastestAvailableSafeDumper,
|
||||
)
|
||||
|
||||
|
||||
def dump(_dict: dict | list) -> str:
|
||||
"""Dump YAML to a string and remove null."""
|
||||
return yaml.dump(
|
||||
_dict,
|
||||
default_flow_style=False,
|
||||
allow_unicode=True,
|
||||
sort_keys=False,
|
||||
Dumper=FastestAvailableSafeDumper,
|
||||
).replace(": null\n", ":\n")
|
||||
|
||||
|
||||
def save_yaml(path: str, data: dict) -> None:
|
||||
"""Save YAML to a file."""
|
||||
# Dump before writing to not truncate the file if dumping fails
|
||||
str_data = dump(data)
|
||||
with open(path, "w", encoding="utf-8") as outfile:
|
||||
outfile.write(str_data)
|
||||
|
||||
|
||||
# From: https://gist.github.com/miracle2k/3184458
|
||||
def represent_odict( # type: ignore[no-untyped-def]
|
||||
dumper, tag, mapping, flow_style=None
|
||||
) -> yaml.MappingNode:
|
||||
"""Like BaseRepresenter.represent_mapping but does not issue the sort()."""
|
||||
value: list = []
|
||||
node = yaml.MappingNode(tag, value, flow_style=flow_style)
|
||||
if dumper.alias_key is not None:
|
||||
dumper.represented_objects[dumper.alias_key] = node
|
||||
best_style = True
|
||||
if hasattr(mapping, "items"):
|
||||
mapping = mapping.items()
|
||||
for item_key, item_value in mapping:
|
||||
node_key = dumper.represent_data(item_key)
|
||||
node_value = dumper.represent_data(item_value)
|
||||
if not (isinstance(node_key, yaml.ScalarNode) and not node_key.style):
|
||||
best_style = False
|
||||
if not (isinstance(node_value, yaml.ScalarNode) and not node_value.style):
|
||||
best_style = False
|
||||
value.append((node_key, node_value))
|
||||
if flow_style is None:
|
||||
if dumper.default_flow_style is not None:
|
||||
node.flow_style = dumper.default_flow_style
|
||||
else:
|
||||
node.flow_style = best_style
|
||||
return node
|
||||
|
||||
|
||||
def add_representer(klass: Any, representer: Any) -> None:
|
||||
"""Add to representer to the dumper."""
|
||||
FastestAvailableSafeDumper.add_representer(klass, representer)
|
||||
|
||||
|
||||
add_representer(
|
||||
OrderedDict,
|
||||
lambda dumper, value: represent_odict(dumper, "tag:yaml.org,2002:map", value),
|
||||
)
|
||||
|
||||
add_representer(
|
||||
NodeDictClass,
|
||||
lambda dumper, value: represent_odict(dumper, "tag:yaml.org,2002:map", value),
|
||||
)
|
||||
|
||||
add_representer(
|
||||
NodeListClass,
|
||||
lambda dumper, value: dumper.represent_sequence("tag:yaml.org,2002:seq", value),
|
||||
)
|
||||
|
||||
add_representer(
|
||||
NodeStrClass,
|
||||
lambda dumper, value: dumper.represent_scalar("tag:yaml.org,2002:str", str(value)),
|
||||
)
|
||||
|
||||
add_representer(
|
||||
Input,
|
||||
lambda dumper, value: dumper.represent_scalar("!input", value.name),
|
||||
)
|
||||
__all__ = ["add_representer", "dump", "represent_odict", "save_yaml"]
|
||||
|
@ -2,55 +2,8 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from annotatedyaml.input import UndefinedSubstitution, extract_inputs, substitute
|
||||
|
||||
from .objects import Input
|
||||
|
||||
|
||||
class UndefinedSubstitution(Exception):
|
||||
"""Error raised when we find a substitution that is not defined."""
|
||||
|
||||
def __init__(self, input_name: str) -> None:
|
||||
"""Initialize the undefined substitution exception."""
|
||||
super().__init__(f"No substitution found for input {input_name}")
|
||||
self.input = input
|
||||
|
||||
|
||||
def extract_inputs(obj: Any) -> set[str]:
|
||||
"""Extract input from a structure."""
|
||||
found: set[str] = set()
|
||||
_extract_inputs(obj, found)
|
||||
return found
|
||||
|
||||
|
||||
def _extract_inputs(obj: Any, found: set[str]) -> None:
|
||||
"""Extract input from a structure."""
|
||||
if isinstance(obj, Input):
|
||||
found.add(obj.name)
|
||||
return
|
||||
|
||||
if isinstance(obj, list):
|
||||
for val in obj:
|
||||
_extract_inputs(val, found)
|
||||
return
|
||||
|
||||
if isinstance(obj, dict):
|
||||
for val in obj.values():
|
||||
_extract_inputs(val, found)
|
||||
return
|
||||
|
||||
|
||||
def substitute(obj: Any, substitutions: dict[str, Any]) -> Any:
|
||||
"""Substitute values."""
|
||||
if isinstance(obj, Input):
|
||||
if obj.name not in substitutions:
|
||||
raise UndefinedSubstitution(obj.name)
|
||||
return substitutions[obj.name]
|
||||
|
||||
if isinstance(obj, list):
|
||||
return [substitute(val, substitutions) for val in obj]
|
||||
|
||||
if isinstance(obj, dict):
|
||||
return {key: substitute(val, substitutions) for key, val in obj.items()}
|
||||
|
||||
return obj
|
||||
__all__ = ["Input", "UndefinedSubstitution", "extract_inputs", "substitute"]
|
||||
|
@ -2,157 +2,37 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable, Iterator
|
||||
import fnmatch
|
||||
from io import StringIO, TextIOWrapper
|
||||
import logging
|
||||
from io import StringIO
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any, TextIO, overload
|
||||
from typing import TextIO
|
||||
|
||||
from annotatedyaml import YAMLException, YamlTypeError
|
||||
from annotatedyaml.loader import (
|
||||
HAS_C_LOADER,
|
||||
JSON_TYPE,
|
||||
LoaderType,
|
||||
Secrets,
|
||||
add_constructor,
|
||||
load_yaml as load_annotated_yaml,
|
||||
load_yaml_dict as load_annotated_yaml_dict,
|
||||
parse_yaml as parse_annotated_yaml,
|
||||
secret_yaml as annotated_secret_yaml,
|
||||
)
|
||||
import yaml
|
||||
|
||||
try:
|
||||
from yaml import CSafeLoader as FastestAvailableSafeLoader
|
||||
|
||||
HAS_C_LOADER = True
|
||||
except ImportError:
|
||||
HAS_C_LOADER = False
|
||||
from yaml import ( # type: ignore[assignment]
|
||||
SafeLoader as FastestAvailableSafeLoader,
|
||||
)
|
||||
|
||||
from propcache.api import cached_property
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
|
||||
from .const import SECRET_YAML
|
||||
from .objects import Input, NodeDictClass, NodeListClass, NodeStrClass
|
||||
|
||||
# mypy: allow-untyped-calls, no-warn-return-any
|
||||
|
||||
JSON_TYPE = list | dict | str
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class YamlTypeError(HomeAssistantError):
|
||||
"""Raised by load_yaml_dict if top level data is not a dict."""
|
||||
|
||||
|
||||
class Secrets:
|
||||
"""Store secrets while loading YAML."""
|
||||
|
||||
def __init__(self, config_dir: Path) -> None:
|
||||
"""Initialize secrets."""
|
||||
self.config_dir = config_dir
|
||||
self._cache: dict[Path, dict[str, str]] = {}
|
||||
|
||||
def get(self, requester_path: str, secret: str) -> str:
|
||||
"""Return the value of a secret."""
|
||||
current_path = Path(requester_path)
|
||||
|
||||
secret_dir = current_path
|
||||
while True:
|
||||
secret_dir = secret_dir.parent
|
||||
|
||||
try:
|
||||
secret_dir.relative_to(self.config_dir)
|
||||
except ValueError:
|
||||
# We went above the config dir
|
||||
break
|
||||
|
||||
secrets = self._load_secret_yaml(secret_dir)
|
||||
|
||||
if secret in secrets:
|
||||
_LOGGER.debug(
|
||||
"Secret %s retrieved from secrets.yaml in folder %s",
|
||||
secret,
|
||||
secret_dir,
|
||||
)
|
||||
return secrets[secret]
|
||||
|
||||
raise HomeAssistantError(f"Secret {secret} not defined")
|
||||
|
||||
def _load_secret_yaml(self, secret_dir: Path) -> dict[str, str]:
|
||||
"""Load the secrets yaml from path."""
|
||||
if (secret_path := secret_dir / SECRET_YAML) in self._cache:
|
||||
return self._cache[secret_path]
|
||||
|
||||
_LOGGER.debug("Loading %s", secret_path)
|
||||
try:
|
||||
secrets = load_yaml(str(secret_path))
|
||||
|
||||
if not isinstance(secrets, dict):
|
||||
raise HomeAssistantError("Secrets is not a dictionary")
|
||||
|
||||
if "logger" in secrets:
|
||||
logger = str(secrets["logger"]).lower()
|
||||
if logger == "debug":
|
||||
_LOGGER.setLevel(logging.DEBUG)
|
||||
else:
|
||||
_LOGGER.error(
|
||||
(
|
||||
"Error in secrets.yaml: 'logger: debug' expected, but"
|
||||
" 'logger: %s' found"
|
||||
),
|
||||
logger,
|
||||
)
|
||||
del secrets["logger"]
|
||||
except FileNotFoundError:
|
||||
secrets = {}
|
||||
|
||||
self._cache[secret_path] = secrets
|
||||
|
||||
return secrets
|
||||
|
||||
|
||||
class _LoaderMixin:
|
||||
"""Mixin class with extensions for YAML loader."""
|
||||
|
||||
name: str
|
||||
stream: Any
|
||||
|
||||
@cached_property
|
||||
def get_name(self) -> str:
|
||||
"""Get the name of the loader."""
|
||||
return self.name
|
||||
|
||||
@cached_property
|
||||
def get_stream_name(self) -> str:
|
||||
"""Get the name of the stream."""
|
||||
return getattr(self.stream, "name", "")
|
||||
|
||||
|
||||
class FastSafeLoader(FastestAvailableSafeLoader, _LoaderMixin):
|
||||
"""The fastest available safe loader, either C or Python."""
|
||||
|
||||
def __init__(self, stream: Any, secrets: Secrets | None = None) -> None:
|
||||
"""Initialize a safe line loader."""
|
||||
self.stream = stream
|
||||
|
||||
# Set name in same way as the Python loader does in yaml.reader.__init__
|
||||
if isinstance(stream, str):
|
||||
self.name = "<unicode string>"
|
||||
elif isinstance(stream, bytes):
|
||||
self.name = "<byte string>"
|
||||
else:
|
||||
self.name = getattr(stream, "name", "<file>")
|
||||
|
||||
super().__init__(stream)
|
||||
self.secrets = secrets
|
||||
|
||||
|
||||
class PythonSafeLoader(yaml.SafeLoader, _LoaderMixin):
|
||||
"""Python safe loader."""
|
||||
|
||||
def __init__(self, stream: Any, secrets: Secrets | None = None) -> None:
|
||||
"""Initialize a safe line loader."""
|
||||
super().__init__(stream)
|
||||
self.secrets = secrets
|
||||
|
||||
|
||||
type LoaderType = FastSafeLoader | PythonSafeLoader
|
||||
__all__ = [
|
||||
"HAS_C_LOADER",
|
||||
"JSON_TYPE",
|
||||
"Secrets",
|
||||
"YamlTypeError",
|
||||
"add_constructor",
|
||||
"load_yaml",
|
||||
"load_yaml_dict",
|
||||
"parse_yaml",
|
||||
"secret_yaml",
|
||||
]
|
||||
|
||||
|
||||
def load_yaml(
|
||||
@ -164,15 +44,9 @@ def load_yaml(
|
||||
except for FileNotFoundError which will be re-raised.
|
||||
"""
|
||||
try:
|
||||
with open(fname, encoding="utf-8") as conf_file:
|
||||
return parse_yaml(conf_file, secrets)
|
||||
except UnicodeDecodeError as exc:
|
||||
_LOGGER.error("Unable to read file %s: %s", fname, exc)
|
||||
raise HomeAssistantError(exc) from exc
|
||||
except FileNotFoundError:
|
||||
raise
|
||||
except OSError as exc:
|
||||
raise HomeAssistantError(exc) from exc
|
||||
return load_annotated_yaml(fname, secrets)
|
||||
except YAMLException as exc:
|
||||
raise HomeAssistantError(str(exc)) from exc
|
||||
|
||||
|
||||
def load_yaml_dict(
|
||||
@ -183,320 +57,27 @@ def load_yaml_dict(
|
||||
Raise if the top level is not a dict.
|
||||
Return an empty dict if the file is empty.
|
||||
"""
|
||||
loaded_yaml = load_yaml(fname, secrets)
|
||||
if loaded_yaml is None:
|
||||
loaded_yaml = {}
|
||||
if not isinstance(loaded_yaml, dict):
|
||||
raise YamlTypeError(f"YAML file {fname} does not contain a dict")
|
||||
return loaded_yaml
|
||||
try:
|
||||
return load_annotated_yaml_dict(fname, secrets)
|
||||
except YamlTypeError:
|
||||
raise
|
||||
except YAMLException as exc:
|
||||
raise HomeAssistantError(str(exc)) from exc
|
||||
|
||||
|
||||
def parse_yaml(
|
||||
content: str | TextIO | StringIO, secrets: Secrets | None = None
|
||||
) -> JSON_TYPE:
|
||||
"""Parse YAML with the fastest available loader."""
|
||||
if not HAS_C_LOADER:
|
||||
return _parse_yaml_python(content, secrets)
|
||||
try:
|
||||
return _parse_yaml(FastSafeLoader, content, secrets)
|
||||
except yaml.YAMLError:
|
||||
# Loading failed, so we now load with the Python loader which has more
|
||||
# readable exceptions
|
||||
if isinstance(content, (StringIO, TextIO, TextIOWrapper)):
|
||||
# Rewind the stream so we can try again
|
||||
content.seek(0, 0)
|
||||
return _parse_yaml_python(content, secrets)
|
||||
|
||||
|
||||
def _parse_yaml_python(
|
||||
content: str | TextIO | StringIO, secrets: Secrets | None = None
|
||||
) -> JSON_TYPE:
|
||||
"""Parse YAML with the python loader (this is very slow)."""
|
||||
try:
|
||||
return _parse_yaml(PythonSafeLoader, content, secrets)
|
||||
except yaml.YAMLError as exc:
|
||||
_LOGGER.error(str(exc))
|
||||
raise HomeAssistantError(exc) from exc
|
||||
|
||||
|
||||
def _parse_yaml(
|
||||
loader: type[FastSafeLoader | PythonSafeLoader],
|
||||
content: str | TextIO,
|
||||
secrets: Secrets | None = None,
|
||||
) -> JSON_TYPE:
|
||||
"""Load a YAML file."""
|
||||
return yaml.load(content, Loader=lambda stream: loader(stream, secrets)) # type: ignore[arg-type]
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference(
|
||||
obj: list | NodeListClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeListClass: ...
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference(
|
||||
obj: str | NodeStrClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeStrClass: ...
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference(
|
||||
obj: dict | NodeDictClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeDictClass: ...
|
||||
|
||||
|
||||
def _add_reference(
|
||||
obj: dict | list | str | NodeDictClass | NodeListClass | NodeStrClass,
|
||||
loader: LoaderType,
|
||||
node: yaml.nodes.Node,
|
||||
) -> NodeDictClass | NodeListClass | NodeStrClass:
|
||||
"""Add file reference information to an object."""
|
||||
if isinstance(obj, list):
|
||||
obj = NodeListClass(obj)
|
||||
elif isinstance(obj, str):
|
||||
obj = NodeStrClass(obj)
|
||||
elif isinstance(obj, dict):
|
||||
obj = NodeDictClass(obj)
|
||||
return _add_reference_to_node_class(obj, loader, node)
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference_to_node_class(
|
||||
obj: NodeListClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeListClass: ...
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference_to_node_class(
|
||||
obj: NodeStrClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeStrClass: ...
|
||||
|
||||
|
||||
@overload
|
||||
def _add_reference_to_node_class(
|
||||
obj: NodeDictClass, loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeDictClass: ...
|
||||
|
||||
|
||||
def _add_reference_to_node_class(
|
||||
obj: NodeDictClass | NodeListClass | NodeStrClass,
|
||||
loader: LoaderType,
|
||||
node: yaml.nodes.Node,
|
||||
) -> NodeDictClass | NodeListClass | NodeStrClass:
|
||||
"""Add file reference information to a node class object."""
|
||||
try: # suppress is much slower
|
||||
obj.__config_file__ = loader.get_name
|
||||
obj.__line__ = node.start_mark.line + 1
|
||||
except AttributeError:
|
||||
pass
|
||||
return obj
|
||||
|
||||
|
||||
def _raise_if_no_value[NodeT: yaml.nodes.Node, _R](
|
||||
func: Callable[[LoaderType, NodeT], _R],
|
||||
) -> Callable[[LoaderType, NodeT], _R]:
|
||||
def wrapper(loader: LoaderType, node: NodeT) -> _R:
|
||||
if not node.value:
|
||||
raise HomeAssistantError(
|
||||
f"{node.start_mark}: {node.tag} needs an argument."
|
||||
)
|
||||
return func(loader, node)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@_raise_if_no_value
|
||||
def _include_yaml(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
|
||||
"""Load another YAML file and embed it using the !include tag.
|
||||
|
||||
Example:
|
||||
device_tracker: !include device_tracker.yaml
|
||||
|
||||
"""
|
||||
fname = os.path.join(os.path.dirname(loader.get_name), node.value)
|
||||
try:
|
||||
loaded_yaml = load_yaml(fname, loader.secrets)
|
||||
if loaded_yaml is None:
|
||||
loaded_yaml = NodeDictClass()
|
||||
return _add_reference(loaded_yaml, loader, node)
|
||||
except FileNotFoundError as exc:
|
||||
raise HomeAssistantError(
|
||||
f"{node.start_mark}: Unable to read file {fname}"
|
||||
) from exc
|
||||
|
||||
|
||||
def _is_file_valid(name: str) -> bool:
|
||||
"""Decide if a file is valid."""
|
||||
return not name.startswith(".")
|
||||
|
||||
|
||||
def _find_files(directory: str, pattern: str) -> Iterator[str]:
|
||||
"""Recursively load files in a directory."""
|
||||
for root, dirs, files in os.walk(directory, topdown=True):
|
||||
dirs[:] = [d for d in dirs if _is_file_valid(d)]
|
||||
for basename in sorted(files):
|
||||
if _is_file_valid(basename) and fnmatch.fnmatch(basename, pattern):
|
||||
filename = os.path.join(root, basename)
|
||||
yield filename
|
||||
|
||||
|
||||
@_raise_if_no_value
|
||||
def _include_dir_named_yaml(loader: LoaderType, node: yaml.nodes.Node) -> NodeDictClass:
|
||||
"""Load multiple files from directory as a dictionary."""
|
||||
mapping = NodeDictClass()
|
||||
loc = os.path.join(os.path.dirname(loader.get_name), node.value)
|
||||
for fname in _find_files(loc, "*.yaml"):
|
||||
filename = os.path.splitext(os.path.basename(fname))[0]
|
||||
if os.path.basename(fname) == SECRET_YAML:
|
||||
continue
|
||||
loaded_yaml = load_yaml(fname, loader.secrets)
|
||||
if loaded_yaml is None:
|
||||
# Special case, an empty file included by !include_dir_named is treated
|
||||
# as an empty dictionary
|
||||
loaded_yaml = NodeDictClass()
|
||||
mapping[filename] = loaded_yaml
|
||||
return _add_reference_to_node_class(mapping, loader, node)
|
||||
|
||||
|
||||
@_raise_if_no_value
|
||||
def _include_dir_merge_named_yaml(
|
||||
loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> NodeDictClass:
|
||||
"""Load multiple files from directory as a merged dictionary."""
|
||||
mapping = NodeDictClass()
|
||||
loc = os.path.join(os.path.dirname(loader.get_name), node.value)
|
||||
for fname in _find_files(loc, "*.yaml"):
|
||||
if os.path.basename(fname) == SECRET_YAML:
|
||||
continue
|
||||
loaded_yaml = load_yaml(fname, loader.secrets)
|
||||
if isinstance(loaded_yaml, dict):
|
||||
mapping.update(loaded_yaml)
|
||||
return _add_reference_to_node_class(mapping, loader, node)
|
||||
|
||||
|
||||
@_raise_if_no_value
|
||||
def _include_dir_list_yaml(
|
||||
loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> list[JSON_TYPE]:
|
||||
"""Load multiple files from directory as a list."""
|
||||
loc = os.path.join(os.path.dirname(loader.get_name), node.value)
|
||||
return [
|
||||
loaded_yaml
|
||||
for f in _find_files(loc, "*.yaml")
|
||||
if os.path.basename(f) != SECRET_YAML
|
||||
and (loaded_yaml := load_yaml(f, loader.secrets)) is not None
|
||||
]
|
||||
|
||||
|
||||
@_raise_if_no_value
|
||||
def _include_dir_merge_list_yaml(
|
||||
loader: LoaderType, node: yaml.nodes.Node
|
||||
) -> JSON_TYPE:
|
||||
"""Load multiple files from directory as a merged list."""
|
||||
loc: str = os.path.join(os.path.dirname(loader.get_name), node.value)
|
||||
merged_list: list[JSON_TYPE] = []
|
||||
for fname in _find_files(loc, "*.yaml"):
|
||||
if os.path.basename(fname) == SECRET_YAML:
|
||||
continue
|
||||
loaded_yaml = load_yaml(fname, loader.secrets)
|
||||
if isinstance(loaded_yaml, list):
|
||||
merged_list.extend(loaded_yaml)
|
||||
return _add_reference(merged_list, loader, node)
|
||||
|
||||
|
||||
def _handle_mapping_tag(
|
||||
loader: LoaderType, node: yaml.nodes.MappingNode
|
||||
) -> NodeDictClass:
|
||||
"""Load YAML mappings into an ordered dictionary to preserve key order."""
|
||||
loader.flatten_mapping(node)
|
||||
nodes = loader.construct_pairs(node)
|
||||
|
||||
seen: dict = {}
|
||||
for (key, _), (child_node, _) in zip(nodes, node.value, strict=False):
|
||||
line = child_node.start_mark.line
|
||||
|
||||
try:
|
||||
hash(key)
|
||||
except TypeError as exc:
|
||||
fname = loader.get_stream_name
|
||||
raise yaml.MarkedYAMLError(
|
||||
context=f'invalid key: "{key}"',
|
||||
context_mark=yaml.Mark(
|
||||
fname,
|
||||
0,
|
||||
line,
|
||||
-1,
|
||||
None,
|
||||
None, # type: ignore[arg-type]
|
||||
),
|
||||
) from exc
|
||||
|
||||
if key in seen:
|
||||
fname = loader.get_stream_name
|
||||
_LOGGER.warning(
|
||||
'YAML file %s contains duplicate key "%s". Check lines %d and %d',
|
||||
fname,
|
||||
key,
|
||||
seen[key],
|
||||
line,
|
||||
)
|
||||
seen[key] = line
|
||||
|
||||
return _add_reference_to_node_class(NodeDictClass(nodes), loader, node)
|
||||
|
||||
|
||||
def _construct_seq(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
|
||||
"""Add line number and file name to Load YAML sequence."""
|
||||
(obj,) = loader.construct_yaml_seq(node)
|
||||
return _add_reference(obj, loader, node)
|
||||
|
||||
|
||||
def _handle_scalar_tag(
|
||||
loader: LoaderType, node: yaml.nodes.ScalarNode
|
||||
) -> str | int | float | None:
|
||||
"""Add line number and file name to Load YAML sequence."""
|
||||
obj = node.value
|
||||
if not isinstance(obj, str):
|
||||
return obj
|
||||
return _add_reference_to_node_class(NodeStrClass(obj), loader, node)
|
||||
|
||||
|
||||
def _env_var_yaml(loader: LoaderType, node: yaml.nodes.Node) -> str:
|
||||
"""Load environment variables and embed it into the configuration YAML."""
|
||||
args = node.value.split()
|
||||
|
||||
# Check for a default value
|
||||
if len(args) > 1:
|
||||
return os.getenv(args[0], " ".join(args[1:]))
|
||||
if args[0] in os.environ:
|
||||
return os.environ[args[0]]
|
||||
_LOGGER.error("Environment variable %s not defined", node.value)
|
||||
raise HomeAssistantError(node.value)
|
||||
return parse_annotated_yaml(content, secrets)
|
||||
except YAMLException as exc:
|
||||
raise HomeAssistantError(str(exc)) from exc
|
||||
|
||||
|
||||
def secret_yaml(loader: LoaderType, node: yaml.nodes.Node) -> JSON_TYPE:
|
||||
"""Load secrets and embed it into the configuration YAML."""
|
||||
if loader.secrets is None:
|
||||
raise HomeAssistantError("Secrets not supported in this YAML file")
|
||||
|
||||
return loader.secrets.get(loader.get_name, node.value)
|
||||
|
||||
|
||||
def add_constructor(tag: Any, constructor: Any) -> None:
|
||||
"""Add to constructor to all loaders."""
|
||||
for yaml_loader in (FastSafeLoader, PythonSafeLoader):
|
||||
yaml_loader.add_constructor(tag, constructor)
|
||||
|
||||
|
||||
add_constructor("!include", _include_yaml)
|
||||
add_constructor(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _handle_mapping_tag)
|
||||
add_constructor(yaml.resolver.BaseResolver.DEFAULT_SCALAR_TAG, _handle_scalar_tag)
|
||||
add_constructor(yaml.resolver.BaseResolver.DEFAULT_SEQUENCE_TAG, _construct_seq)
|
||||
add_constructor("!env_var", _env_var_yaml)
|
||||
add_constructor("!secret", secret_yaml)
|
||||
add_constructor("!include_dir_list", _include_dir_list_yaml)
|
||||
add_constructor("!include_dir_merge_list", _include_dir_merge_list_yaml)
|
||||
add_constructor("!include_dir_named", _include_dir_named_yaml)
|
||||
add_constructor("!include_dir_merge_named", _include_dir_merge_named_yaml)
|
||||
add_constructor("!input", Input.from_node)
|
||||
try:
|
||||
return annotated_secret_yaml(loader, node)
|
||||
except YAMLException as exc:
|
||||
raise HomeAssistantError(str(exc)) from exc
|
||||
|
@ -2,52 +2,6 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from annotatedyaml.objects import Input, NodeDictClass, NodeListClass, NodeStrClass
|
||||
|
||||
import voluptuous as vol
|
||||
from voluptuous.schema_builder import _compile_scalar
|
||||
import yaml
|
||||
|
||||
|
||||
class NodeListClass(list):
|
||||
"""Wrapper class to be able to add attributes on a list."""
|
||||
|
||||
__slots__ = ("__config_file__", "__line__")
|
||||
|
||||
__config_file__: str
|
||||
__line__: int | str
|
||||
|
||||
|
||||
class NodeStrClass(str):
|
||||
"""Wrapper class to be able to add attributes on a string."""
|
||||
|
||||
__slots__ = ("__config_file__", "__line__")
|
||||
|
||||
__config_file__: str
|
||||
__line__: int | str
|
||||
|
||||
def __voluptuous_compile__(self, schema: vol.Schema) -> Any:
|
||||
"""Needed because vol.Schema.compile does not handle str subclasses."""
|
||||
return _compile_scalar(self) # type: ignore[no-untyped-call]
|
||||
|
||||
|
||||
class NodeDictClass(dict):
|
||||
"""Wrapper class to be able to add attributes on a dict."""
|
||||
|
||||
__slots__ = ("__config_file__", "__line__")
|
||||
|
||||
__config_file__: str
|
||||
__line__: int | str
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class Input:
|
||||
"""Input that should be substituted."""
|
||||
|
||||
name: str
|
||||
|
||||
@classmethod
|
||||
def from_node(cls, loader: yaml.Loader, node: yaml.nodes.Node) -> Input:
|
||||
"""Create a new placeholder from a node."""
|
||||
return cls(node.value)
|
||||
__all__ = ["Input", "NodeDictClass", "NodeListClass", "NodeStrClass"]
|
||||
|
@ -33,6 +33,7 @@ dependencies = [
|
||||
"aiohttp-fast-zlib==0.2.3",
|
||||
"aiohttp-asyncmdnsresolver==0.1.1",
|
||||
"aiozoneinfo==0.2.3",
|
||||
"annotatedyaml==0.1.1",
|
||||
"astral==2.2",
|
||||
"async-interrupt==1.2.2",
|
||||
"attrs==25.1.0",
|
||||
|
1
requirements.txt
generated
1
requirements.txt
generated
@ -10,6 +10,7 @@ aiohttp_cors==0.7.0
|
||||
aiohttp-fast-zlib==0.2.3
|
||||
aiohttp-asyncmdnsresolver==0.1.1
|
||||
aiozoneinfo==0.2.3
|
||||
annotatedyaml==0.1.1
|
||||
astral==2.2
|
||||
async-interrupt==1.2.2
|
||||
attrs==25.1.0
|
||||
|
@ -29,6 +29,7 @@ from typing import Any, Literal, NoReturn
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
|
||||
from aiohttp.test_utils import unused_port as get_test_instance_port # noqa: F401
|
||||
from annotatedyaml import load_yaml_dict, loader as yaml_loader
|
||||
import pytest
|
||||
from syrupy import SnapshotAssertion
|
||||
import voluptuous as vol
|
||||
@ -109,7 +110,6 @@ from homeassistant.util.json import (
|
||||
)
|
||||
from homeassistant.util.signal_type import SignalType
|
||||
from homeassistant.util.unit_system import METRIC_SYSTEM
|
||||
from homeassistant.util.yaml import load_yaml_dict, loader as yaml_loader
|
||||
|
||||
from .testing_config.custom_components.test_constant_deprecation import (
|
||||
import_deprecated_constant,
|
||||
|
@ -961,7 +961,7 @@ async def test_async_get_all_descriptions_dot_keys(hass: HomeAssistant) -> None:
|
||||
side_effect=service._load_services_files,
|
||||
) as proxy_load_services_files,
|
||||
patch(
|
||||
"homeassistant.util.yaml.loader.load_yaml",
|
||||
"annotatedyaml.loader.load_yaml",
|
||||
side_effect=load_yaml,
|
||||
) as mock_load_yaml,
|
||||
):
|
||||
@ -1033,7 +1033,7 @@ async def test_async_get_all_descriptions_filter(hass: HomeAssistant) -> None:
|
||||
side_effect=service._load_services_files,
|
||||
) as proxy_load_services_files,
|
||||
patch(
|
||||
"homeassistant.util.yaml.loader.load_yaml",
|
||||
"annotatedyaml.loader.load_yaml",
|
||||
side_effect=load_yaml,
|
||||
) as mock_load_yaml,
|
||||
):
|
||||
|
@ -434,7 +434,7 @@
|
||||
# name: test_yaml_error[basic]
|
||||
'''
|
||||
mapping values are not allowed here
|
||||
in "configuration.yaml", line 4, column 14
|
||||
in "<BASE_PATH>/fixtures/core/config/yaml_errors/basic/configuration.yaml", line 4, column 14
|
||||
'''
|
||||
# ---
|
||||
# name: test_yaml_error[basic].1
|
||||
@ -448,7 +448,7 @@
|
||||
# name: test_yaml_error[basic_include]
|
||||
'''
|
||||
mapping values are not allowed here
|
||||
in "integrations/iot_domain.yaml", line 3, column 12
|
||||
in "<BASE_PATH>/fixtures/core/config/yaml_errors/basic_include/integrations/iot_domain.yaml", line 3, column 12
|
||||
'''
|
||||
# ---
|
||||
# name: test_yaml_error[basic_include].1
|
||||
@ -462,7 +462,7 @@
|
||||
# name: test_yaml_error[include_dir_list]
|
||||
'''
|
||||
mapping values are not allowed here
|
||||
in "iot_domain/iot_domain_1.yaml", line 3, column 10
|
||||
in "<BASE_PATH>/fixtures/core/config/yaml_errors/include_dir_list/iot_domain/iot_domain_1.yaml", line 3, column 10
|
||||
'''
|
||||
# ---
|
||||
# name: test_yaml_error[include_dir_list].1
|
||||
@ -476,7 +476,7 @@
|
||||
# name: test_yaml_error[include_dir_merge_list]
|
||||
'''
|
||||
mapping values are not allowed here
|
||||
in "iot_domain/iot_domain_1.yaml", line 3, column 12
|
||||
in "<BASE_PATH>/fixtures/core/config/yaml_errors/include_dir_merge_list/iot_domain/iot_domain_1.yaml", line 3, column 12
|
||||
'''
|
||||
# ---
|
||||
# name: test_yaml_error[include_dir_merge_list].1
|
||||
@ -490,7 +490,7 @@
|
||||
# name: test_yaml_error[packages_include_dir_named]
|
||||
'''
|
||||
mapping values are not allowed here
|
||||
in "integrations/adr_0007_1.yaml", line 4, column 9
|
||||
in "<BASE_PATH>/fixtures/core/config/yaml_errors/packages_include_dir_named/integrations/adr_0007_1.yaml", line 4, column 9
|
||||
'''
|
||||
# ---
|
||||
# name: test_yaml_error[packages_include_dir_named].1
|
||||
|
@ -374,7 +374,7 @@ def test_include_dir_merge_named_recursive(mock_walk: Mock) -> None:
|
||||
}
|
||||
|
||||
|
||||
@patch("homeassistant.util.yaml.loader.open", create=True)
|
||||
@patch("annotatedyaml.loader.open", create=True)
|
||||
@pytest.mark.usefixtures("try_both_loaders")
|
||||
def test_load_yaml_encoding_error(mock_open: Mock) -> None:
|
||||
"""Test raising a UnicodeDecodeError."""
|
||||
@ -598,7 +598,7 @@ def test_load_yaml_wrap_oserror(
|
||||
) -> None:
|
||||
"""Test load_yaml wraps OSError in HomeAssistantError."""
|
||||
with (
|
||||
patch("homeassistant.util.yaml.loader.open", side_effect=open_exception),
|
||||
patch("annotatedyaml.loader.open", side_effect=open_exception),
|
||||
pytest.raises(load_yaml_exception),
|
||||
):
|
||||
yaml_loader.load_yaml("bla")
|
||||
|
Loading…
x
Reference in New Issue
Block a user