Restructure template engine, add crypto & base64 Jinja extension (#152261)

This commit is contained in:
Franck Nijhof
2025-09-13 22:21:29 +02:00
committed by GitHub
parent 0e2c2ad355
commit 70df7b8503
11 changed files with 357 additions and 207 deletions

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from ast import literal_eval from ast import literal_eval
import asyncio import asyncio
import base64
import collections.abc import collections.abc
from collections.abc import Callable, Generator, Iterable, MutableSequence from collections.abc import Callable, Generator, Iterable, MutableSequence
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
@@ -12,7 +11,6 @@ from contextvars import ContextVar
from copy import deepcopy from copy import deepcopy
from datetime import date, datetime, time, timedelta from datetime import date, datetime, time, timedelta
from functools import cache, lru_cache, partial, wraps from functools import cache, lru_cache, partial, wraps
import hashlib
import json import json
import logging import logging
import math import math
@@ -71,6 +69,19 @@ from homeassistant.core import (
valid_entity_id, valid_entity_id,
) )
from homeassistant.exceptions import TemplateError from homeassistant.exceptions import TemplateError
from homeassistant.helpers import (
area_registry as ar,
device_registry as dr,
entity_registry as er,
floor_registry as fr,
issue_registry as ir,
label_registry as lr,
location as loc_helper,
)
from homeassistant.helpers.deprecation import deprecated_function
from homeassistant.helpers.singleton import singleton
from homeassistant.helpers.translation import async_translate_state
from homeassistant.helpers.typing import TemplateVarsType
from homeassistant.loader import bind_hass from homeassistant.loader import bind_hass
from homeassistant.util import ( from homeassistant.util import (
convert, convert,
@@ -84,20 +95,6 @@ from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
from homeassistant.util.read_only_dict import ReadOnlyDict from homeassistant.util.read_only_dict import ReadOnlyDict
from homeassistant.util.thread import ThreadWithException from homeassistant.util.thread import ThreadWithException
from . import (
area_registry,
device_registry,
entity_registry,
floor_registry as fr,
issue_registry,
label_registry,
location as loc_helper,
)
from .deprecation import deprecated_function
from .singleton import singleton
from .translation import async_translate_state
from .typing import TemplateVarsType
if TYPE_CHECKING: if TYPE_CHECKING:
from _typeshed import OptExcInfo from _typeshed import OptExcInfo
@@ -210,7 +207,7 @@ def async_setup(hass: HomeAssistant) -> bool:
if new_size > current_size: if new_size > current_size:
lru.set_size(new_size) lru.set_size(new_size)
from .event import async_track_time_interval # noqa: PLC0415 from homeassistant.helpers.event import async_track_time_interval # noqa: PLC0415
cancel = async_track_time_interval( cancel = async_track_time_interval(
hass, _async_adjust_lru_sizes, timedelta(minutes=10) hass, _async_adjust_lru_sizes, timedelta(minutes=10)
@@ -525,7 +522,10 @@ class Template:
Note: A valid hass instance should always be passed in. The hass parameter Note: A valid hass instance should always be passed in. The hass parameter
will be non optional in Home Assistant Core 2025.10. will be non optional in Home Assistant Core 2025.10.
""" """
from .frame import ReportBehavior, report_usage # noqa: PLC0415 from homeassistant.helpers.frame import ( # noqa: PLC0415
ReportBehavior,
report_usage,
)
if not isinstance(template, str): if not isinstance(template, str):
raise TypeError("Expected template to be a string") raise TypeError("Expected template to be a string")
@@ -973,7 +973,7 @@ class StateTranslated:
state_value = state.state state_value = state.state
domain = state.domain domain = state.domain
device_class = state.attributes.get("device_class") device_class = state.attributes.get("device_class")
entry = entity_registry.async_get(self._hass).async_get(entity_id) entry = er.async_get(self._hass).async_get(entity_id)
platform = None if entry is None else entry.platform platform = None if entry is None else entry.platform
translation_key = None if entry is None else entry.translation_key translation_key = None if entry is None else entry.translation_key
@@ -1274,7 +1274,7 @@ def forgiving_boolean[_T](
"""Try to convert value to a boolean.""" """Try to convert value to a boolean."""
try: try:
# Import here, not at top-level to avoid circular import # Import here, not at top-level to avoid circular import
from . import config_validation as cv # noqa: PLC0415 from homeassistant.helpers import config_validation as cv # noqa: PLC0415
return cv.boolean(value) return cv.boolean(value)
except vol.Invalid: except vol.Invalid:
@@ -1299,7 +1299,7 @@ def result_as_boolean(template_result: Any | None) -> bool:
def expand(hass: HomeAssistant, *args: Any) -> Iterable[State]: def expand(hass: HomeAssistant, *args: Any) -> Iterable[State]:
"""Expand out any groups and zones into entity states.""" """Expand out any groups and zones into entity states."""
# circular import. # circular import.
from . import entity as entity_helper # noqa: PLC0415 from homeassistant.helpers import entity as entity_helper # noqa: PLC0415
search = list(args) search = list(args)
found = {} found = {}
@@ -1341,8 +1341,8 @@ def expand(hass: HomeAssistant, *args: Any) -> Iterable[State]:
def device_entities(hass: HomeAssistant, _device_id: str) -> Iterable[str]: def device_entities(hass: HomeAssistant, _device_id: str) -> Iterable[str]:
"""Get entity ids for entities tied to a device.""" """Get entity ids for entities tied to a device."""
entity_reg = entity_registry.async_get(hass) entity_reg = er.async_get(hass)
entries = entity_registry.async_entries_for_device(entity_reg, _device_id) entries = er.async_entries_for_device(entity_reg, _device_id)
return [entry.entity_id for entry in entries] return [entry.entity_id for entry in entries]
@@ -1360,19 +1360,17 @@ def integration_entities(hass: HomeAssistant, entry_name: str) -> Iterable[str]:
# first try if there are any config entries with a matching title # first try if there are any config entries with a matching title
entities: list[str] = [] entities: list[str] = []
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
for entry in hass.config_entries.async_entries(): for entry in hass.config_entries.async_entries():
if entry.title != entry_name: if entry.title != entry_name:
continue continue
entries = entity_registry.async_entries_for_config_entry( entries = er.async_entries_for_config_entry(ent_reg, entry.entry_id)
ent_reg, entry.entry_id
)
entities.extend(entry.entity_id for entry in entries) entities.extend(entry.entity_id for entry in entries)
if entities: if entities:
return entities return entities
# fallback to just returning all entities for a domain # fallback to just returning all entities for a domain
from .entity import entity_sources # noqa: PLC0415 from homeassistant.helpers.entity import entity_sources # noqa: PLC0415
return [ return [
entity_id entity_id
@@ -1383,7 +1381,7 @@ def integration_entities(hass: HomeAssistant, entry_name: str) -> Iterable[str]:
def config_entry_id(hass: HomeAssistant, entity_id: str) -> str | None: def config_entry_id(hass: HomeAssistant, entity_id: str) -> str | None:
"""Get an config entry ID from an entity ID.""" """Get an config entry ID from an entity ID."""
entity_reg = entity_registry.async_get(hass) entity_reg = er.async_get(hass)
if entity := entity_reg.async_get(entity_id): if entity := entity_reg.async_get(entity_id):
return entity.config_entry_id return entity.config_entry_id
return None return None
@@ -1391,12 +1389,12 @@ def config_entry_id(hass: HomeAssistant, entity_id: str) -> str | None:
def device_id(hass: HomeAssistant, entity_id_or_device_name: str) -> str | None: def device_id(hass: HomeAssistant, entity_id_or_device_name: str) -> str | None:
"""Get a device ID from an entity ID or device name.""" """Get a device ID from an entity ID or device name."""
entity_reg = entity_registry.async_get(hass) entity_reg = er.async_get(hass)
entity = entity_reg.async_get(entity_id_or_device_name) entity = entity_reg.async_get(entity_id_or_device_name)
if entity is not None: if entity is not None:
return entity.device_id return entity.device_id
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
return next( return next(
( (
device_id device_id
@@ -1410,13 +1408,13 @@ def device_id(hass: HomeAssistant, entity_id_or_device_name: str) -> str | None:
def device_name(hass: HomeAssistant, lookup_value: str) -> str | None: def device_name(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the device name from an device id, or entity id.""" """Get the device name from an device id, or entity id."""
device_reg = device_registry.async_get(hass) device_reg = dr.async_get(hass)
if device := device_reg.async_get(lookup_value): if device := device_reg.async_get(lookup_value):
return device.name_by_user or device.name return device.name_by_user or device.name
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
# Import here, not at top-level to avoid circular import # Import here, not at top-level to avoid circular import
from . import config_validation as cv # noqa: PLC0415 from homeassistant.helpers import config_validation as cv # noqa: PLC0415
try: try:
cv.entity_id(lookup_value) cv.entity_id(lookup_value)
@@ -1432,7 +1430,7 @@ def device_name(hass: HomeAssistant, lookup_value: str) -> str | None:
def device_attr(hass: HomeAssistant, device_or_entity_id: str, attr_name: str) -> Any: def device_attr(hass: HomeAssistant, device_or_entity_id: str, attr_name: str) -> Any:
"""Get the device specific attribute.""" """Get the device specific attribute."""
device_reg = device_registry.async_get(hass) device_reg = dr.async_get(hass)
if not isinstance(device_or_entity_id, str): if not isinstance(device_or_entity_id, str):
raise TemplateError("Must provide a device or entity ID") raise TemplateError("Must provide a device or entity ID")
device = None device = None
@@ -1475,14 +1473,14 @@ def is_device_attr(
def issues(hass: HomeAssistant) -> dict[tuple[str, str], dict[str, Any]]: def issues(hass: HomeAssistant) -> dict[tuple[str, str], dict[str, Any]]:
"""Return all open issues.""" """Return all open issues."""
current_issues = issue_registry.async_get(hass).issues current_issues = ir.async_get(hass).issues
# Use JSON for safe representation # Use JSON for safe representation
return {k: v.to_json() for (k, v) in current_issues.items()} return {k: v.to_json() for (k, v) in current_issues.items()}
def issue(hass: HomeAssistant, domain: str, issue_id: str) -> dict[str, Any] | None: def issue(hass: HomeAssistant, domain: str, issue_id: str) -> dict[str, Any] | None:
"""Get issue by domain and issue_id.""" """Get issue by domain and issue_id."""
result = issue_registry.async_get(hass).async_get_issue(domain, issue_id) result = ir.async_get(hass).async_get_issue(domain, issue_id)
if result: if result:
return result.to_json() return result.to_json()
return None return None
@@ -1505,7 +1503,7 @@ def floor_id(hass: HomeAssistant, lookup_value: Any) -> str | None:
return floors_list[0].floor_id return floors_list[0].floor_id
if aid := area_id(hass, lookup_value): if aid := area_id(hass, lookup_value):
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
if area := area_reg.async_get_area(aid): if area := area_reg.async_get_area(aid):
return area.floor_id return area.floor_id
@@ -1519,7 +1517,7 @@ def floor_name(hass: HomeAssistant, lookup_value: str) -> str | None:
return floor.name return floor.name
if aid := area_id(hass, lookup_value): if aid := area_id(hass, lookup_value):
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
if ( if (
(area := area_reg.async_get_area(aid)) (area := area_reg.async_get_area(aid))
and area.floor_id and area.floor_id
@@ -1542,8 +1540,8 @@ def floor_areas(hass: HomeAssistant, floor_id_or_name: str) -> Iterable[str]:
if _floor_id is None: if _floor_id is None:
return [] return []
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
entries = area_registry.async_entries_for_floor(area_reg, _floor_id) entries = ar.async_entries_for_floor(area_reg, _floor_id)
return [entry.id for entry in entries if entry.id] return [entry.id for entry in entries if entry.id]
@@ -1558,12 +1556,12 @@ def floor_entities(hass: HomeAssistant, floor_id_or_name: str) -> Iterable[str]:
def areas(hass: HomeAssistant) -> Iterable[str | None]: def areas(hass: HomeAssistant) -> Iterable[str | None]:
"""Return all areas.""" """Return all areas."""
return list(area_registry.async_get(hass).areas) return list(ar.async_get(hass).areas)
def area_id(hass: HomeAssistant, lookup_value: str) -> str | None: def area_id(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the area ID from an area name, alias, device id, or entity id.""" """Get the area ID from an area name, alias, device id, or entity id."""
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
lookup_str = str(lookup_value) lookup_str = str(lookup_value)
if area := area_reg.async_get_area_by_name(lookup_str): if area := area_reg.async_get_area_by_name(lookup_str):
return area.id return area.id
@@ -1571,10 +1569,10 @@ def area_id(hass: HomeAssistant, lookup_value: str) -> str | None:
if areas_list: if areas_list:
return areas_list[0].id return areas_list[0].id
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
# Import here, not at top-level to avoid circular import # Import here, not at top-level to avoid circular import
from . import config_validation as cv # noqa: PLC0415 from homeassistant.helpers import config_validation as cv # noqa: PLC0415
try: try:
cv.entity_id(lookup_value) cv.entity_id(lookup_value)
@@ -1596,7 +1594,7 @@ def area_id(hass: HomeAssistant, lookup_value: str) -> str | None:
return None return None
def _get_area_name(area_reg: area_registry.AreaRegistry, valid_area_id: str) -> str: def _get_area_name(area_reg: ar.AreaRegistry, valid_area_id: str) -> str:
"""Get area name from valid area ID.""" """Get area name from valid area ID."""
area = area_reg.async_get_area(valid_area_id) area = area_reg.async_get_area(valid_area_id)
assert area assert area
@@ -1605,14 +1603,14 @@ def _get_area_name(area_reg: area_registry.AreaRegistry, valid_area_id: str) ->
def area_name(hass: HomeAssistant, lookup_value: str) -> str | None: def area_name(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the area name from an area id, device id, or entity id.""" """Get the area name from an area id, device id, or entity id."""
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
if area := area_reg.async_get_area(lookup_value): if area := area_reg.async_get_area(lookup_value):
return area.name return area.name
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
# Import here, not at top-level to avoid circular import # Import here, not at top-level to avoid circular import
from . import config_validation as cv # noqa: PLC0415 from homeassistant.helpers import config_validation as cv # noqa: PLC0415
try: try:
cv.entity_id(lookup_value) cv.entity_id(lookup_value)
@@ -1649,19 +1647,18 @@ def area_entities(hass: HomeAssistant, area_id_or_name: str) -> Iterable[str]:
_area_id = area_id_or_name _area_id = area_id_or_name
if _area_id is None: if _area_id is None:
return [] return []
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
entity_ids = [ entity_ids = [
entry.entity_id entry.entity_id for entry in er.async_entries_for_area(ent_reg, _area_id)
for entry in entity_registry.async_entries_for_area(ent_reg, _area_id)
] ]
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
# We also need to add entities tied to a device in the area that don't themselves # We also need to add entities tied to a device in the area that don't themselves
# have an area specified since they inherit the area from the device. # have an area specified since they inherit the area from the device.
entity_ids.extend( entity_ids.extend(
[ [
entity.entity_id entity.entity_id
for device in device_registry.async_entries_for_area(dev_reg, _area_id) for device in dr.async_entries_for_area(dev_reg, _area_id)
for entity in entity_registry.async_entries_for_device(ent_reg, device.id) for entity in er.async_entries_for_device(ent_reg, device.id)
if entity.area_id is None if entity.area_id is None
] ]
) )
@@ -1679,21 +1676,21 @@ def area_devices(hass: HomeAssistant, area_id_or_name: str) -> Iterable[str]:
_area_id = area_id(hass, area_id_or_name) _area_id = area_id(hass, area_id_or_name)
if _area_id is None: if _area_id is None:
return [] return []
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
entries = device_registry.async_entries_for_area(dev_reg, _area_id) entries = dr.async_entries_for_area(dev_reg, _area_id)
return [entry.id for entry in entries] return [entry.id for entry in entries]
def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None]: def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None]:
"""Return all labels, or those from a area ID, device ID, or entity ID.""" """Return all labels, or those from a area ID, device ID, or entity ID."""
label_reg = label_registry.async_get(hass) label_reg = lr.async_get(hass)
if lookup_value is None: if lookup_value is None:
return list(label_reg.labels) return list(label_reg.labels)
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
# Import here, not at top-level to avoid circular import # Import here, not at top-level to avoid circular import
from . import config_validation as cv # noqa: PLC0415 from homeassistant.helpers import config_validation as cv # noqa: PLC0415
lookup_value = str(lookup_value) lookup_value = str(lookup_value)
@@ -1706,12 +1703,12 @@ def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None
return list(entity.labels) return list(entity.labels)
# Check if this could be a device ID # Check if this could be a device ID
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
if device := dev_reg.async_get(lookup_value): if device := dev_reg.async_get(lookup_value):
return list(device.labels) return list(device.labels)
# Check if this could be a area ID # Check if this could be a area ID
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
if area := area_reg.async_get_area(lookup_value): if area := area_reg.async_get_area(lookup_value):
return list(area.labels) return list(area.labels)
@@ -1720,7 +1717,7 @@ def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None
def label_id(hass: HomeAssistant, lookup_value: Any) -> str | None: def label_id(hass: HomeAssistant, lookup_value: Any) -> str | None:
"""Get the label ID from a label name.""" """Get the label ID from a label name."""
label_reg = label_registry.async_get(hass) label_reg = lr.async_get(hass)
if label := label_reg.async_get_label_by_name(str(lookup_value)): if label := label_reg.async_get_label_by_name(str(lookup_value)):
return label.label_id return label.label_id
return None return None
@@ -1728,7 +1725,7 @@ def label_id(hass: HomeAssistant, lookup_value: Any) -> str | None:
def label_name(hass: HomeAssistant, lookup_value: str) -> str | None: def label_name(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the label name from a label ID.""" """Get the label name from a label ID."""
label_reg = label_registry.async_get(hass) label_reg = lr.async_get(hass)
if label := label_reg.async_get_label(lookup_value): if label := label_reg.async_get_label(lookup_value):
return label.name return label.name
return None return None
@@ -1736,7 +1733,7 @@ def label_name(hass: HomeAssistant, lookup_value: str) -> str | None:
def label_description(hass: HomeAssistant, lookup_value: str) -> str | None: def label_description(hass: HomeAssistant, lookup_value: str) -> str | None:
"""Get the label description from a label ID.""" """Get the label description from a label ID."""
label_reg = label_registry.async_get(hass) label_reg = lr.async_get(hass)
if label := label_reg.async_get_label(lookup_value): if label := label_reg.async_get_label(lookup_value):
return label.description return label.description
return None return None
@@ -1755,8 +1752,8 @@ def label_areas(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]:
"""Return areas for a given label ID or name.""" """Return areas for a given label ID or name."""
if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None:
return [] return []
area_reg = area_registry.async_get(hass) area_reg = ar.async_get(hass)
entries = area_registry.async_entries_for_label(area_reg, _label_id) entries = ar.async_entries_for_label(area_reg, _label_id)
return [entry.id for entry in entries] return [entry.id for entry in entries]
@@ -1764,8 +1761,8 @@ def label_devices(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]:
"""Return device IDs for a given label ID or name.""" """Return device IDs for a given label ID or name."""
if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None:
return [] return []
dev_reg = device_registry.async_get(hass) dev_reg = dr.async_get(hass)
entries = device_registry.async_entries_for_label(dev_reg, _label_id) entries = dr.async_entries_for_label(dev_reg, _label_id)
return [entry.id for entry in entries] return [entry.id for entry in entries]
@@ -1773,8 +1770,8 @@ def label_entities(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]:
"""Return entities for a given label ID or name.""" """Return entities for a given label ID or name."""
if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None:
return [] return []
ent_reg = entity_registry.async_get(hass) ent_reg = er.async_get(hass)
entries = entity_registry.async_entries_for_label(ent_reg, _label_id) entries = er.async_entries_for_label(ent_reg, _label_id)
return [entry.entity_id for entry in entries] return [entry.entity_id for entry in entries]
@@ -1913,7 +1910,7 @@ def distance(hass: HomeAssistant, *args: Any) -> float | None:
def is_hidden_entity(hass: HomeAssistant, entity_id: str) -> bool: def is_hidden_entity(hass: HomeAssistant, entity_id: str) -> bool:
"""Test if an entity is hidden.""" """Test if an entity is hidden."""
entity_reg = entity_registry.async_get(hass) entity_reg = er.async_get(hass)
entry = entity_reg.async_get(entity_id) entry = entity_reg.async_get(entity_id)
return entry is not None and entry.hidden return entry is not None and entry.hidden
@@ -2608,22 +2605,6 @@ def from_hex(value: str) -> bytes:
return bytes.fromhex(value) return bytes.fromhex(value)
def base64_encode(value: str | bytes) -> str:
"""Perform base64 encode."""
if isinstance(value, str):
value = value.encode("utf-8")
return base64.b64encode(value).decode("utf-8")
def base64_decode(value: str, encoding: str | None = "utf-8") -> str | bytes:
"""Perform base64 decode."""
decoded = base64.b64decode(value)
if encoding:
return decoded.decode(encoding)
return decoded
def ordinal(value): def ordinal(value):
"""Perform ordinal conversion.""" """Perform ordinal conversion."""
suffixes = ["th", "st", "nd", "rd"] + ["th"] * 6 # codespell:ignore nd suffixes = ["th", "st", "nd", "rd"] + ["th"] * 6 # codespell:ignore nd
@@ -2928,26 +2909,6 @@ def combine(*args: Any, recursive: bool = False) -> dict[Any, Any]:
return result return result
def md5(value: str) -> str:
"""Generate md5 hash from a string."""
return hashlib.md5(value.encode()).hexdigest()
def sha1(value: str) -> str:
"""Generate sha1 hash from a string."""
return hashlib.sha1(value.encode()).hexdigest()
def sha256(value: str) -> str:
"""Generate sha256 hash from a string."""
return hashlib.sha256(value.encode()).hexdigest()
def sha512(value: str) -> str:
"""Generate sha512 hash from a string."""
return hashlib.sha512(value.encode()).hexdigest()
class TemplateContextManager(AbstractContextManager): class TemplateContextManager(AbstractContextManager):
"""Context manager to store template being parsed or rendered in a ContextVar.""" """Context manager to store template being parsed or rendered in a ContextVar."""
@@ -3096,11 +3057,14 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
"""Initialise template environment.""" """Initialise template environment."""
super().__init__(undefined=make_logging_undefined(strict, log_fn)) super().__init__(undefined=make_logging_undefined(strict, log_fn))
self.hass = hass self.hass = hass
self.limited = limited
self.template_cache: weakref.WeakValueDictionary[ self.template_cache: weakref.WeakValueDictionary[
str | jinja2.nodes.Template, CodeType | None str | jinja2.nodes.Template, CodeType | None
] = weakref.WeakValueDictionary() ] = weakref.WeakValueDictionary()
self.add_extension("jinja2.ext.loopcontrols") self.add_extension("jinja2.ext.loopcontrols")
self.add_extension("jinja2.ext.do") self.add_extension("jinja2.ext.do")
self.add_extension("homeassistant.helpers.template.extensions.Base64Extension")
self.add_extension("homeassistant.helpers.template.extensions.CryptoExtension")
self.globals["acos"] = arc_cosine self.globals["acos"] = arc_cosine
self.globals["as_datetime"] = as_datetime self.globals["as_datetime"] = as_datetime
@@ -3125,16 +3089,12 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
self.globals["is_number"] = is_number self.globals["is_number"] = is_number
self.globals["log"] = logarithm self.globals["log"] = logarithm
self.globals["max"] = min_max_from_filter(self.filters["max"], "max") self.globals["max"] = min_max_from_filter(self.filters["max"], "max")
self.globals["md5"] = md5
self.globals["median"] = median self.globals["median"] = median
self.globals["merge_response"] = merge_response self.globals["merge_response"] = merge_response
self.globals["min"] = min_max_from_filter(self.filters["min"], "min") self.globals["min"] = min_max_from_filter(self.filters["min"], "min")
self.globals["pack"] = struct_pack self.globals["pack"] = struct_pack
self.globals["pi"] = math.pi self.globals["pi"] = math.pi
self.globals["set"] = _to_set self.globals["set"] = _to_set
self.globals["sha1"] = sha1
self.globals["sha256"] = sha256
self.globals["sha512"] = sha512
self.globals["shuffle"] = shuffle self.globals["shuffle"] = shuffle
self.globals["sin"] = sine self.globals["sin"] = sine
self.globals["slugify"] = slugify self.globals["slugify"] = slugify
@@ -3165,8 +3125,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
self.filters["atan"] = arc_tangent self.filters["atan"] = arc_tangent
self.filters["atan2"] = arc_tangent2 self.filters["atan2"] = arc_tangent2
self.filters["average"] = average self.filters["average"] = average
self.filters["base64_decode"] = base64_decode
self.filters["base64_encode"] = base64_encode
self.filters["bitwise_and"] = bitwise_and self.filters["bitwise_and"] = bitwise_and
self.filters["bitwise_or"] = bitwise_or self.filters["bitwise_or"] = bitwise_or
self.filters["bitwise_xor"] = bitwise_xor self.filters["bitwise_xor"] = bitwise_xor
@@ -3185,7 +3143,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
self.filters["is_defined"] = fail_when_undefined self.filters["is_defined"] = fail_when_undefined
self.filters["is_number"] = is_number self.filters["is_number"] = is_number
self.filters["log"] = logarithm self.filters["log"] = logarithm
self.filters["md5"] = md5
self.filters["median"] = median self.filters["median"] = median
self.filters["multiply"] = multiply self.filters["multiply"] = multiply
self.filters["ord"] = ord self.filters["ord"] = ord
@@ -3198,9 +3155,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment):
self.filters["regex_replace"] = regex_replace self.filters["regex_replace"] = regex_replace
self.filters["regex_search"] = regex_search self.filters["regex_search"] = regex_search
self.filters["round"] = forgiving_round self.filters["round"] = forgiving_round
self.filters["sha1"] = sha1
self.filters["sha256"] = sha256
self.filters["sha512"] = sha512
self.filters["shuffle"] = shuffle self.filters["shuffle"] = shuffle
self.filters["sin"] = sine self.filters["sin"] = sine
self.filters["slugify"] = slugify self.filters["slugify"] = slugify

View File

@@ -0,0 +1,6 @@
"""Home Assistant template extensions."""
from .base64 import Base64Extension
from .crypto import CryptoExtension
__all__ = ["Base64Extension", "CryptoExtension"]

View File

@@ -0,0 +1,60 @@
"""Base extension class for Home Assistant template extensions."""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from jinja2.ext import Extension
from jinja2.nodes import Node
from jinja2.parser import Parser
if TYPE_CHECKING:
from homeassistant.helpers.template import TemplateEnvironment
@dataclass
class TemplateFunction:
"""Definition for a template function, filter, or test."""
name: str
func: Callable[..., Any]
as_global: bool = False
as_filter: bool = False
as_test: bool = False
limited_ok: bool = (
True # Whether this function is available in limited environments
)
class BaseTemplateExtension(Extension):
"""Base class for Home Assistant template extensions."""
environment: TemplateEnvironment
def __init__(
self,
environment: TemplateEnvironment,
*,
functions: list[TemplateFunction] | None = None,
) -> None:
"""Initialize the extension with a list of template functions."""
super().__init__(environment)
if functions:
for template_func in functions:
# Skip functions not allowed in limited environments
if self.environment.limited and not template_func.limited_ok:
continue
if template_func.as_global:
environment.globals[template_func.name] = template_func.func
if template_func.as_filter:
environment.filters[template_func.name] = template_func.func
if template_func.as_test:
environment.tests[template_func.name] = template_func.func
def parse(self, parser: Parser) -> Node | list[Node]:
"""Required by Jinja2 Extension base class."""
return []

View File

@@ -0,0 +1,50 @@
"""Base64 encoding and decoding functions for Home Assistant templates."""
from __future__ import annotations
import base64
from typing import TYPE_CHECKING
from .base import BaseTemplateExtension, TemplateFunction
if TYPE_CHECKING:
from homeassistant.helpers.template import TemplateEnvironment
class Base64Extension(BaseTemplateExtension):
"""Jinja2 extension for base64 encoding and decoding functions."""
def __init__(self, environment: TemplateEnvironment) -> None:
"""Initialize the base64 extension."""
super().__init__(
environment,
functions=[
TemplateFunction(
"base64_encode",
self.base64_encode,
as_filter=True,
limited_ok=False,
),
TemplateFunction(
"base64_decode",
self.base64_decode,
as_filter=True,
limited_ok=False,
),
],
)
@staticmethod
def base64_encode(value: str | bytes) -> str:
"""Encode a string or bytes to base64."""
if isinstance(value, str):
value = value.encode("utf-8")
return base64.b64encode(value).decode("utf-8")
@staticmethod
def base64_decode(value: str, encoding: str | None = "utf-8") -> str | bytes:
"""Decode a base64 string."""
decoded = base64.b64decode(value)
if encoding is None:
return decoded
return decoded.decode(encoding)

View File

@@ -0,0 +1,64 @@
"""Cryptographic hash functions for Home Assistant templates."""
from __future__ import annotations
import hashlib
from typing import TYPE_CHECKING
from .base import BaseTemplateExtension, TemplateFunction
if TYPE_CHECKING:
from homeassistant.helpers.template import TemplateEnvironment
class CryptoExtension(BaseTemplateExtension):
"""Jinja2 extension for cryptographic hash functions."""
def __init__(self, environment: TemplateEnvironment) -> None:
"""Initialize the crypto extension."""
super().__init__(
environment,
functions=[
# Hash functions (as globals and filters)
TemplateFunction(
"md5", self.md5, as_global=True, as_filter=True, limited_ok=False
),
TemplateFunction(
"sha1", self.sha1, as_global=True, as_filter=True, limited_ok=False
),
TemplateFunction(
"sha256",
self.sha256,
as_global=True,
as_filter=True,
limited_ok=False,
),
TemplateFunction(
"sha512",
self.sha512,
as_global=True,
as_filter=True,
limited_ok=False,
),
],
)
@staticmethod
def md5(value: str) -> str:
"""Generate md5 hash from a string."""
return hashlib.md5(value.encode()).hexdigest()
@staticmethod
def sha1(value: str) -> str:
"""Generate sha1 hash from a string."""
return hashlib.sha1(value.encode()).hexdigest()
@staticmethod
def sha256(value: str) -> str:
"""Generate sha256 hash from a string."""
return hashlib.sha256(value.encode()).hexdigest()
@staticmethod
def sha512(value: str) -> str:
"""Generate sha512 hash from a string."""
return hashlib.sha512(value.encode()).hexdigest()

View File

@@ -0,0 +1 @@
"""Tests for Home Assistant template engine."""

View File

@@ -0,0 +1 @@
"""Tests for Home Assistant template extensions."""

View File

@@ -0,0 +1,43 @@
"""Test base64 encoding and decoding functions for Home Assistant templates."""
from __future__ import annotations
import pytest
from homeassistant.core import HomeAssistant
from homeassistant.helpers import template
@pytest.mark.parametrize(
("value_template", "expected"),
[
('{{ "homeassistant" | base64_encode }}', "aG9tZWFzc2lzdGFudA=="),
("{{ int('0F010003', base=16) | pack('>I') | base64_encode }}", "DwEAAw=="),
("{{ 'AA01000200150020' | from_hex | base64_encode }}", "qgEAAgAVACA="),
],
)
def test_base64_encode(hass: HomeAssistant, value_template: str, expected: str) -> None:
"""Test the base64_encode filter."""
assert template.Template(value_template, hass).async_render() == expected
def test_base64_decode(hass: HomeAssistant) -> None:
"""Test the base64_decode filter."""
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode }}', hass
).async_render()
== "homeassistant"
)
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode(None) }}', hass
).async_render()
== b"homeassistant"
)
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode("ascii") }}', hass
).async_render()
== "homeassistant"
)

View File

@@ -0,0 +1,58 @@
"""Test cryptographic hash functions for Home Assistant templates."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from homeassistant.helpers import template
def test_md5(hass: HomeAssistant) -> None:
"""Test the md5 function and filter."""
assert (
template.Template("{{ md5('Home Assistant') }}", hass).async_render()
== "3d15e5c102c3413d0337393c3287e006"
)
assert (
template.Template("{{ 'Home Assistant' | md5 }}", hass).async_render()
== "3d15e5c102c3413d0337393c3287e006"
)
def test_sha1(hass: HomeAssistant) -> None:
"""Test the sha1 function and filter."""
assert (
template.Template("{{ sha1('Home Assistant') }}", hass).async_render()
== "c8fd3bb19b94312664faa619af7729bdbf6e9f8a"
)
assert (
template.Template("{{ 'Home Assistant' | sha1 }}", hass).async_render()
== "c8fd3bb19b94312664faa619af7729bdbf6e9f8a"
)
def test_sha256(hass: HomeAssistant) -> None:
"""Test the sha256 function and filter."""
assert (
template.Template("{{ sha256('Home Assistant') }}", hass).async_render()
== "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b"
)
assert (
template.Template("{{ 'Home Assistant' | sha256 }}", hass).async_render()
== "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b"
)
def test_sha512(hass: HomeAssistant) -> None:
"""Test the sha512 function and filter."""
assert (
template.Template("{{ sha512('Home Assistant') }}", hass).async_render()
== "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb"
)
assert (
template.Template("{{ 'Home Assistant' | sha512 }}", hass).async_render()
== "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb"
)

View File

@@ -1739,41 +1739,6 @@ def test_from_hex(hass: HomeAssistant) -> None:
) )
@pytest.mark.parametrize(
("value_template", "expected"),
[
('{{ "homeassistant" | base64_encode }}', "aG9tZWFzc2lzdGFudA=="),
("{{ int('0F010003', base=16) | pack('>I') | base64_encode }}", "DwEAAw=="),
("{{ 'AA01000200150020' | from_hex | base64_encode }}", "qgEAAgAVACA="),
],
)
def test_base64_encode(hass: HomeAssistant, value_template: str, expected: str) -> None:
"""Test the base64_encode filter."""
assert template.Template(value_template, hass).async_render() == expected
def test_base64_decode(hass: HomeAssistant) -> None:
"""Test the base64_decode filter."""
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode }}', hass
).async_render()
== "homeassistant"
)
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode(None) }}', hass
).async_render()
== b"homeassistant"
)
assert (
template.Template(
'{{ "aG9tZWFzc2lzdGFudA==" | base64_decode("ascii") }}', hass
).async_render()
== "homeassistant"
)
def test_slugify(hass: HomeAssistant) -> None: def test_slugify(hass: HomeAssistant) -> None:
"""Test the slugify filter.""" """Test the slugify filter."""
assert ( assert (
@@ -7174,58 +7139,6 @@ def test_symmetric_difference(hass: HomeAssistant) -> None:
).async_render() ).async_render()
def test_md5(hass: HomeAssistant) -> None:
"""Test the md5 function and filter."""
assert (
template.Template("{{ md5('Home Assistant') }}", hass).async_render()
== "3d15e5c102c3413d0337393c3287e006"
)
assert (
template.Template("{{ 'Home Assistant' | md5 }}", hass).async_render()
== "3d15e5c102c3413d0337393c3287e006"
)
def test_sha1(hass: HomeAssistant) -> None:
"""Test the sha1 function and filter."""
assert (
template.Template("{{ sha1('Home Assistant') }}", hass).async_render()
== "c8fd3bb19b94312664faa619af7729bdbf6e9f8a"
)
assert (
template.Template("{{ 'Home Assistant' | sha1 }}", hass).async_render()
== "c8fd3bb19b94312664faa619af7729bdbf6e9f8a"
)
def test_sha256(hass: HomeAssistant) -> None:
"""Test the sha256 function and filter."""
assert (
template.Template("{{ sha256('Home Assistant') }}", hass).async_render()
== "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b"
)
assert (
template.Template("{{ 'Home Assistant' | sha256 }}", hass).async_render()
== "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b"
)
def test_sha512(hass: HomeAssistant) -> None:
"""Test the sha512 function and filter."""
assert (
template.Template("{{ sha512('Home Assistant') }}", hass).async_render()
== "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb"
)
assert (
template.Template("{{ 'Home Assistant' | sha512 }}", hass).async_render()
== "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb"
)
def test_combine(hass: HomeAssistant) -> None: def test_combine(hass: HomeAssistant) -> None:
"""Test combine filter and function.""" """Test combine filter and function."""
assert template.Template( assert template.Template(