Jinja expressions in configs (Take #3) (#8955)

This commit is contained in:
Javier Peletier 2025-07-01 04:57:00 +02:00 committed by GitHub
parent 27c745d5a1
commit 8c34b72b62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 486 additions and 24 deletions

View File

@ -74,7 +74,7 @@ BASE_SCHEMA = cv.All(
{
cv.Required(CONF_PATH): validate_yaml_filename,
cv.Optional(CONF_VARS, default={}): cv.Schema(
{cv.string: cv.string}
{cv.string: object}
),
}
),
@ -148,7 +148,6 @@ def _process_base_package(config: dict) -> dict:
raise cv.Invalid(
f"Current ESPHome Version is too old to use this package: {ESPHOME_VERSION} < {min_version}"
)
vars = {k: str(v) for k, v in vars.items()}
new_yaml = yaml_util.substitute_vars(new_yaml, vars)
packages[f"{filename}{idx}"] = new_yaml
except EsphomeError as e:

View File

@ -5,6 +5,13 @@ from esphome.config_helpers import Extend, Remove, merge_config
import esphome.config_validation as cv
from esphome.const import CONF_SUBSTITUTIONS, VALID_SUBSTITUTIONS_CHARACTERS
from esphome.yaml_util import ESPHomeDataBase, make_data_base
from .jinja import (
Jinja,
JinjaStr,
has_jinja,
TemplateError,
TemplateRuntimeError,
)
CODEOWNERS = ["@esphome/core"]
_LOGGER = logging.getLogger(__name__)
@ -28,7 +35,7 @@ def validate_substitution_key(value):
CONFIG_SCHEMA = cv.Schema(
{
validate_substitution_key: cv.string_strict,
validate_substitution_key: object,
}
)
@ -37,7 +44,42 @@ async def to_code(config):
pass
def _expand_substitutions(substitutions, value, path, ignore_missing):
def _expand_jinja(value, orig_value, path, jinja, ignore_missing):
if has_jinja(value):
# If the original value passed in to this function is a JinjaStr, it means it contains an unresolved
# Jinja expression from a previous pass.
if isinstance(orig_value, JinjaStr):
# Rebuild the JinjaStr in case it was lost while replacing substitutions.
value = JinjaStr(value, orig_value.upvalues)
try:
# Invoke the jinja engine to evaluate the expression.
value, err = jinja.expand(value)
if err is not None:
if not ignore_missing and "password" not in path:
_LOGGER.warning(
"Found '%s' (see %s) which looks like an expression,"
" but could not resolve all the variables: %s",
value,
"->".join(str(x) for x in path),
err.message,
)
except (
TemplateError,
TemplateRuntimeError,
RuntimeError,
ArithmeticError,
AttributeError,
TypeError,
) as err:
raise cv.Invalid(
f"{type(err).__name__} Error evaluating jinja expression '{value}': {str(err)}."
f" See {'->'.join(str(x) for x in path)}",
path,
)
return value
def _expand_substitutions(substitutions, value, path, jinja, ignore_missing):
if "$" not in value:
return value
@ -47,7 +89,8 @@ def _expand_substitutions(substitutions, value, path, ignore_missing):
while True:
m = cv.VARIABLE_PROG.search(value, i)
if not m:
# Nothing more to match. Done
# No more variable substitutions found. See if the remainder looks like a jinja template
value = _expand_jinja(value, orig_value, path, jinja, ignore_missing)
break
i, j = m.span(0)
@ -67,8 +110,15 @@ def _expand_substitutions(substitutions, value, path, ignore_missing):
continue
sub = substitutions[name]
if i == 0 and j == len(value):
# The variable spans the whole expression, e.g., "${varName}". Return its resolved value directly
# to conserve its type.
value = sub
break
tail = value[j:]
value = value[:i] + sub
value = value[:i] + str(sub)
i = len(value)
value += tail
@ -77,36 +127,40 @@ def _expand_substitutions(substitutions, value, path, ignore_missing):
if isinstance(orig_value, ESPHomeDataBase):
# even though string can get larger or smaller, the range should point
# to original document marks
return make_data_base(value, orig_value)
value = make_data_base(value, orig_value)
return value
def _substitute_item(substitutions, item, path, ignore_missing):
def _substitute_item(substitutions, item, path, jinja, ignore_missing):
if isinstance(item, list):
for i, it in enumerate(item):
sub = _substitute_item(substitutions, it, path + [i], ignore_missing)
sub = _substitute_item(substitutions, it, path + [i], jinja, ignore_missing)
if sub is not None:
item[i] = sub
elif isinstance(item, dict):
replace_keys = []
for k, v in item.items():
if path or k != CONF_SUBSTITUTIONS:
sub = _substitute_item(substitutions, k, path + [k], ignore_missing)
sub = _substitute_item(
substitutions, k, path + [k], jinja, ignore_missing
)
if sub is not None:
replace_keys.append((k, sub))
sub = _substitute_item(substitutions, v, path + [k], ignore_missing)
sub = _substitute_item(substitutions, v, path + [k], jinja, ignore_missing)
if sub is not None:
item[k] = sub
for old, new in replace_keys:
item[new] = merge_config(item.get(old), item.get(new))
del item[old]
elif isinstance(item, str):
sub = _expand_substitutions(substitutions, item, path, ignore_missing)
if sub != item:
sub = _expand_substitutions(substitutions, item, path, jinja, ignore_missing)
if isinstance(sub, JinjaStr) or sub != item:
return sub
elif isinstance(item, (core.Lambda, Extend, Remove)):
sub = _expand_substitutions(substitutions, item.value, path, ignore_missing)
sub = _expand_substitutions(
substitutions, item.value, path, jinja, ignore_missing
)
if sub != item:
item.value = sub
return None
@ -116,11 +170,11 @@ def do_substitution_pass(config, command_line_substitutions, ignore_missing=Fals
if CONF_SUBSTITUTIONS not in config and not command_line_substitutions:
return
substitutions = config.get(CONF_SUBSTITUTIONS)
if substitutions is None:
substitutions = command_line_substitutions
elif command_line_substitutions:
substitutions = {**substitutions, **command_line_substitutions}
# Merge substitutions in config, overriding with substitutions coming from command line:
substitutions = {
**config.get(CONF_SUBSTITUTIONS, {}),
**(command_line_substitutions or {}),
}
with cv.prepend_path("substitutions"):
if not isinstance(substitutions, dict):
raise cv.Invalid(
@ -133,7 +187,7 @@ def do_substitution_pass(config, command_line_substitutions, ignore_missing=Fals
sub = validate_substitution_key(key)
if sub != key:
replace_keys.append((key, sub))
substitutions[key] = cv.string_strict(value)
substitutions[key] = value
for old, new in replace_keys:
substitutions[new] = substitutions[old]
del substitutions[old]
@ -141,4 +195,7 @@ def do_substitution_pass(config, command_line_substitutions, ignore_missing=Fals
config[CONF_SUBSTITUTIONS] = substitutions
# Move substitutions to the first place to replace substitutions in them correctly
config.move_to_end(CONF_SUBSTITUTIONS, False)
_substitute_item(substitutions, config, [], ignore_missing)
# Create a Jinja environment that will consider substitutions in scope:
jinja = Jinja(substitutions)
_substitute_item(substitutions, config, [], jinja, ignore_missing)

View File

@ -0,0 +1,99 @@
import logging
import math
import re
import jinja2 as jinja
from jinja2.nativetypes import NativeEnvironment
TemplateError = jinja.TemplateError
TemplateSyntaxError = jinja.TemplateSyntaxError
TemplateRuntimeError = jinja.TemplateRuntimeError
UndefinedError = jinja.UndefinedError
Undefined = jinja.Undefined
_LOGGER = logging.getLogger(__name__)
DETECT_JINJA = r"(\$\{)"
detect_jinja_re = re.compile(
r"<%.+?%>" # Block form expression: <% ... %>
r"|\$\{[^}]+\}", # Braced form expression: ${ ... }
flags=re.MULTILINE,
)
def has_jinja(st):
return detect_jinja_re.search(st) is not None
class JinjaStr(str):
"""
Wraps a string containing an unresolved Jinja expression,
storing the variables visible to it when it failed to resolve.
For example, an expression inside a package, `${ A * B }` may fail
to resolve at package parsing time if `A` is a local package var
but `B` is a substitution defined in the root yaml.
Therefore, we store the value of `A` as an upvalue bound
to the original string so we may be able to resolve `${ A * B }`
later in the main substitutions pass.
"""
def __new__(cls, value: str, upvalues=None):
obj = super().__new__(cls, value)
obj.upvalues = upvalues or {}
return obj
def __init__(self, value: str, upvalues=None):
self.upvalues = upvalues or {}
class Jinja:
"""
Wraps a Jinja environment
"""
def __init__(self, context_vars):
self.env = NativeEnvironment(
trim_blocks=True,
lstrip_blocks=True,
block_start_string="<%",
block_end_string="%>",
line_statement_prefix="#",
line_comment_prefix="##",
variable_start_string="${",
variable_end_string="}",
undefined=jinja.StrictUndefined,
)
self.env.add_extension("jinja2.ext.do")
self.env.globals["math"] = math # Inject entire math module
self.context_vars = {**context_vars}
self.env.globals = {**self.env.globals, **self.context_vars}
def expand(self, content_str):
"""
Renders a string that may contain Jinja expressions or statements
Returns the resulting processed string if all values could be resolved.
Otherwise, it returns a tagged (JinjaStr) string that captures variables
in scope (upvalues), like a closure for later evaluation.
"""
result = None
override_vars = {}
if isinstance(content_str, JinjaStr):
# If `value` is already a JinjaStr, it means we are trying to evaluate it again
# in a parent pass.
# Hopefully, all required variables are visible now.
override_vars = content_str.upvalues
try:
template = self.env.from_string(content_str)
result = template.render(override_vars)
if isinstance(result, Undefined):
# This happens when the expression is simply an undefined variable. Jinja does not
# raise an exception, instead we get "Undefined".
# Trigger an UndefinedError exception so we skip to below.
print("" + result)
except (TemplateSyntaxError, UndefinedError) as err:
# `content_str` contains a Jinja expression that refers to a variable that is undefined
# in this scope. Perhaps it refers to a root substitution that is not visible yet.
# Therefore, return the original `content_str` as a JinjaStr, which contains the variables
# that are actually visible to it at this point to postpone evaluation.
return JinjaStr(content_str, {**self.context_vars, **override_vars}), err
return result, None

View File

@ -789,7 +789,6 @@ def validate_config(
result.add_output_path([CONF_SUBSTITUTIONS], CONF_SUBSTITUTIONS)
try:
substitutions.do_substitution_pass(config, command_line_substitutions)
substitutions.do_substitution_pass(config, command_line_substitutions)
except vol.Invalid as err:
result.add_error(err)
return result

View File

@ -292,8 +292,6 @@ class ESPHomeLoaderMixin:
if file is None:
raise yaml.MarkedYAMLError("Must include 'file'", node.start_mark)
vars = fields.get(CONF_VARS)
if vars:
vars = {k: str(v) for k, v in vars.items()}
return file, vars
if isinstance(node, yaml.nodes.MappingNode):

View File

@ -21,6 +21,7 @@ esphome-glyphsets==0.2.0
pillow==10.4.0
cairosvg==2.8.2
freetype-py==2.5.1
jinja2==3.1.6
# esp-idf requires this, but doesn't bundle it by default
# https://github.com/espressif/esp-idf/blob/220590d599e134d7a5e7f1e683cc4550349ffbf8/requirements.txt#L24

View File

@ -0,0 +1 @@
*.received.yaml

View File

@ -0,0 +1,19 @@
substitutions:
var1: '1'
var2: '2'
var21: '79'
esphome:
name: test
test_list:
- '1'
- '1'
- '1'
- '1'
- 'Values: 1 2'
- 'Value: 79'
- 1 + 2
- 1 * 2
- 'Undefined var: ${undefined_var}'
- ${undefined_var}
- $undefined_var
- ${ undefined_var }

View File

@ -0,0 +1,21 @@
esphome:
name: test
substitutions:
var1: "1"
var2: "2"
var21: "79"
test_list:
- "$var1"
- "${var1}"
- $var1
- ${var1}
- "Values: $var1 ${var2}"
- "Value: ${var2${var1}}"
- "$var1 + $var2"
- "${ var1 } * ${ var2 }"
- "Undefined var: ${undefined_var}"
- ${undefined_var}
- $undefined_var
- ${ undefined_var }

View File

@ -0,0 +1,15 @@
substitutions:
var1: '1'
var2: '2'
a: alpha
test_list:
- values:
- var1: '1'
- a: A
- b: B-default
- c: The value of C is C
- values:
- var1: '1'
- a: alpha
- b: beta
- c: The value of C is $c

View File

@ -0,0 +1,15 @@
substitutions:
var1: "1"
var2: "2"
a: "alpha"
test_list:
- !include
file: inc1.yaml
vars:
a: "A"
c: "C"
- !include
file: inc1.yaml
vars:
b: "beta"

View File

@ -0,0 +1,24 @@
substitutions:
width: 7
height: 8
enabled: true
pin: &id001
number: 18
inverted: true
area: 25
numberOne: 1
var1: 79
test_list:
- The area is 56
- 56
- 56 + 1
- ENABLED
- list:
- 7
- 8
- width: 7
height: 8
- *id001
- The pin number is 18
- The square root is: 5.0
- The number is 80

View File

@ -0,0 +1,22 @@
substitutions:
width: 7
height: 8
enabled: true
pin:
number: 18
inverted: true
area: 25
numberOne: 1
var1: 79
test_list:
- "The area is ${width * height}"
- ${width * height}
- ${width * height} + 1
- ${enabled and "ENABLED" or "DISABLED"}
- list: ${ [width, height] }
- "${ {'width': width, 'height': height} }"
- ${pin}
- The pin number is ${pin.number}
- The square root is: ${math.sqrt(area)}
- The number is ${var${numberOne} + 1}

View File

@ -0,0 +1,17 @@
substitutions:
B: 5
var7: 79
package_result:
- The value of A*B is 35, where A is a package var and B is a substitution in the
root file
- Double substitution also works; the value of var7 is 79, where A is a package
var
local_results:
- The value of B is 5
- 'You will see, however, that
${A} is not substituted here, since
it is out of scope.
'

View File

@ -0,0 +1,16 @@
substitutions:
B: 5
var7: 79
packages:
closures_package: !include
file: closures_package.yaml
vars:
A: 7
local_results:
- The value of B is ${B}
- |
You will see, however, that
${A} is not substituted here, since
it is out of scope.

View File

@ -0,0 +1,5 @@
display:
- platform: ili9xxx
dimensions:
width: 960
height: 544

View File

@ -0,0 +1,7 @@
# main.yaml
packages:
my_display: !include
file: display.yaml
vars:
high_dpi: true
native_height: 272

View File

@ -0,0 +1,3 @@
package_result:
- The value of A*B is ${A * B}, where A is a package var and B is a substitution in the root file
- Double substitution also works; the value of var7 is ${var$A}, where A is a package var

View File

@ -0,0 +1,11 @@
# display.yaml
defaults:
native_width: 480
native_height: 480
display:
- platform: ili9xxx
dimensions:
width: ${high_dpi and native_width * 2 or native_width}
height: ${high_dpi and native_height * 2 or native_height}

View File

@ -0,0 +1,8 @@
defaults:
b: "B-default"
values:
- var1: $var1
- a: $a
- b: ${b}
- c: The value of C is $c

View File

@ -0,0 +1,125 @@
import glob
import logging
import os
from esphome import yaml_util
from esphome.components import substitutions
from esphome.const import CONF_PACKAGES
_LOGGER = logging.getLogger(__name__)
# Set to True for dev mode behavior
# This will generate the expected version of the test files.
DEV_MODE = False
def sort_dicts(obj):
"""Recursively sort dictionaries for order-insensitive comparison."""
if isinstance(obj, dict):
return {k: sort_dicts(obj[k]) for k in sorted(obj)}
elif isinstance(obj, list):
# Lists are not sorted; we preserve order
return [sort_dicts(i) for i in obj]
else:
return obj
def dict_diff(a, b, path=""):
"""Recursively find differences between two dict/list structures."""
diffs = []
if isinstance(a, dict) and isinstance(b, dict):
a_keys = set(a)
b_keys = set(b)
for key in a_keys - b_keys:
diffs.append(f"{path}/{key} only in actual")
for key in b_keys - a_keys:
diffs.append(f"{path}/{key} only in expected")
for key in a_keys & b_keys:
diffs.extend(dict_diff(a[key], b[key], f"{path}/{key}"))
elif isinstance(a, list) and isinstance(b, list):
min_len = min(len(a), len(b))
for i in range(min_len):
diffs.extend(dict_diff(a[i], b[i], f"{path}[{i}]"))
if len(a) > len(b):
for i in range(min_len, len(a)):
diffs.append(f"{path}[{i}] only in actual: {a[i]!r}")
elif len(b) > len(a):
for i in range(min_len, len(b)):
diffs.append(f"{path}[{i}] only in expected: {b[i]!r}")
else:
if a != b:
diffs.append(f"\t{path}: actual={a!r} expected={b!r}")
return diffs
def write_yaml(path, data):
with open(path, "w", encoding="utf-8") as f:
f.write(yaml_util.dump(data))
def test_substitutions_fixtures(fixture_path):
base_dir = fixture_path / "substitutions"
sources = sorted(glob.glob(str(base_dir / "*.input.yaml")))
assert sources, f"No input YAML files found in {base_dir}"
failures = []
for source_path in sources:
try:
expected_path = source_path.replace(".input.yaml", ".approved.yaml")
test_case = os.path.splitext(os.path.basename(source_path))[0].replace(
".input", ""
)
# Load using ESPHome's YAML loader
config = yaml_util.load_yaml(source_path)
if CONF_PACKAGES in config:
from esphome.components.packages import do_packages_pass
config = do_packages_pass(config)
substitutions.do_substitution_pass(config, None)
# Also load expected using ESPHome's loader, or use {} if missing and DEV_MODE
if os.path.isfile(expected_path):
expected = yaml_util.load_yaml(expected_path)
elif DEV_MODE:
expected = {}
else:
assert os.path.isfile(expected_path), (
f"Expected file missing: {expected_path}"
)
# Sort dicts only (not lists) for comparison
got_sorted = sort_dicts(config)
expected_sorted = sort_dicts(expected)
if got_sorted != expected_sorted:
diff = "\n".join(dict_diff(got_sorted, expected_sorted))
msg = (
f"Substitution result mismatch for {os.path.basename(source_path)}\n"
f"Diff:\n{diff}\n\n"
f"Got: {got_sorted}\n"
f"Expected: {expected_sorted}"
)
# Write out the received file when test fails
if DEV_MODE:
received_path = os.path.join(
os.path.dirname(source_path), f"{test_case}.received.yaml"
)
write_yaml(received_path, config)
print(msg)
failures.append(msg)
else:
raise AssertionError(msg)
except Exception as err:
_LOGGER.error("Error in test file %s", source_path)
raise err
if DEV_MODE and failures:
print(f"\n{len(failures)} substitution test case(s) failed.")
if DEV_MODE:
_LOGGER.error("Tests passed, but Dev mode is enabled.")
assert not DEV_MODE # make sure DEV_MODE is disabled after you are finished.