diff --git a/homeassistant/helpers/template.py b/homeassistant/helpers/template/__init__.py similarity index 95% rename from homeassistant/helpers/template.py rename to homeassistant/helpers/template/__init__.py index 8e3106093aa..357a16c7340 100644 --- a/homeassistant/helpers/template.py +++ b/homeassistant/helpers/template/__init__.py @@ -4,7 +4,6 @@ from __future__ import annotations from ast import literal_eval import asyncio -import base64 import collections.abc from collections.abc import Callable, Generator, Iterable, MutableSequence from contextlib import AbstractContextManager @@ -12,7 +11,6 @@ from contextvars import ContextVar from copy import deepcopy from datetime import date, datetime, time, timedelta from functools import cache, lru_cache, partial, wraps -import hashlib import json import logging import math @@ -71,6 +69,19 @@ from homeassistant.core import ( valid_entity_id, ) from homeassistant.exceptions import TemplateError +from homeassistant.helpers import ( + area_registry as ar, + device_registry as dr, + entity_registry as er, + floor_registry as fr, + issue_registry as ir, + label_registry as lr, + location as loc_helper, +) +from homeassistant.helpers.deprecation import deprecated_function +from homeassistant.helpers.singleton import singleton +from homeassistant.helpers.translation import async_translate_state +from homeassistant.helpers.typing import TemplateVarsType from homeassistant.loader import bind_hass from homeassistant.util import ( convert, @@ -84,20 +95,6 @@ from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads from homeassistant.util.read_only_dict import ReadOnlyDict from homeassistant.util.thread import ThreadWithException -from . import ( - area_registry, - device_registry, - entity_registry, - floor_registry as fr, - issue_registry, - label_registry, - location as loc_helper, -) -from .deprecation import deprecated_function -from .singleton import singleton -from .translation import async_translate_state -from .typing import TemplateVarsType - if TYPE_CHECKING: from _typeshed import OptExcInfo @@ -210,7 +207,7 @@ def async_setup(hass: HomeAssistant) -> bool: if new_size > current_size: lru.set_size(new_size) - from .event import async_track_time_interval # noqa: PLC0415 + from homeassistant.helpers.event import async_track_time_interval # noqa: PLC0415 cancel = async_track_time_interval( hass, _async_adjust_lru_sizes, timedelta(minutes=10) @@ -525,7 +522,10 @@ class Template: Note: A valid hass instance should always be passed in. The hass parameter will be non optional in Home Assistant Core 2025.10. """ - from .frame import ReportBehavior, report_usage # noqa: PLC0415 + from homeassistant.helpers.frame import ( # noqa: PLC0415 + ReportBehavior, + report_usage, + ) if not isinstance(template, str): raise TypeError("Expected template to be a string") @@ -973,7 +973,7 @@ class StateTranslated: state_value = state.state domain = state.domain device_class = state.attributes.get("device_class") - entry = entity_registry.async_get(self._hass).async_get(entity_id) + entry = er.async_get(self._hass).async_get(entity_id) platform = None if entry is None else entry.platform translation_key = None if entry is None else entry.translation_key @@ -1274,7 +1274,7 @@ def forgiving_boolean[_T]( """Try to convert value to a boolean.""" try: # Import here, not at top-level to avoid circular import - from . import config_validation as cv # noqa: PLC0415 + from homeassistant.helpers import config_validation as cv # noqa: PLC0415 return cv.boolean(value) except vol.Invalid: @@ -1299,7 +1299,7 @@ def result_as_boolean(template_result: Any | None) -> bool: def expand(hass: HomeAssistant, *args: Any) -> Iterable[State]: """Expand out any groups and zones into entity states.""" # circular import. - from . import entity as entity_helper # noqa: PLC0415 + from homeassistant.helpers import entity as entity_helper # noqa: PLC0415 search = list(args) found = {} @@ -1341,8 +1341,8 @@ def expand(hass: HomeAssistant, *args: Any) -> Iterable[State]: def device_entities(hass: HomeAssistant, _device_id: str) -> Iterable[str]: """Get entity ids for entities tied to a device.""" - entity_reg = entity_registry.async_get(hass) - entries = entity_registry.async_entries_for_device(entity_reg, _device_id) + entity_reg = er.async_get(hass) + entries = er.async_entries_for_device(entity_reg, _device_id) return [entry.entity_id for entry in entries] @@ -1360,19 +1360,17 @@ def integration_entities(hass: HomeAssistant, entry_name: str) -> Iterable[str]: # first try if there are any config entries with a matching title entities: list[str] = [] - ent_reg = entity_registry.async_get(hass) + ent_reg = er.async_get(hass) for entry in hass.config_entries.async_entries(): if entry.title != entry_name: continue - entries = entity_registry.async_entries_for_config_entry( - ent_reg, entry.entry_id - ) + entries = er.async_entries_for_config_entry(ent_reg, entry.entry_id) entities.extend(entry.entity_id for entry in entries) if entities: return entities # fallback to just returning all entities for a domain - from .entity import entity_sources # noqa: PLC0415 + from homeassistant.helpers.entity import entity_sources # noqa: PLC0415 return [ entity_id @@ -1383,7 +1381,7 @@ def integration_entities(hass: HomeAssistant, entry_name: str) -> Iterable[str]: def config_entry_id(hass: HomeAssistant, entity_id: str) -> str | None: """Get an config entry ID from an entity ID.""" - entity_reg = entity_registry.async_get(hass) + entity_reg = er.async_get(hass) if entity := entity_reg.async_get(entity_id): return entity.config_entry_id return None @@ -1391,12 +1389,12 @@ def config_entry_id(hass: HomeAssistant, entity_id: str) -> str | None: def device_id(hass: HomeAssistant, entity_id_or_device_name: str) -> str | None: """Get a device ID from an entity ID or device name.""" - entity_reg = entity_registry.async_get(hass) + entity_reg = er.async_get(hass) entity = entity_reg.async_get(entity_id_or_device_name) if entity is not None: return entity.device_id - dev_reg = device_registry.async_get(hass) + dev_reg = dr.async_get(hass) return next( ( device_id @@ -1410,13 +1408,13 @@ def device_id(hass: HomeAssistant, entity_id_or_device_name: str) -> str | None: def device_name(hass: HomeAssistant, lookup_value: str) -> str | None: """Get the device name from an device id, or entity id.""" - device_reg = device_registry.async_get(hass) + device_reg = dr.async_get(hass) if device := device_reg.async_get(lookup_value): return device.name_by_user or device.name - ent_reg = entity_registry.async_get(hass) + ent_reg = er.async_get(hass) # Import here, not at top-level to avoid circular import - from . import config_validation as cv # noqa: PLC0415 + from homeassistant.helpers import config_validation as cv # noqa: PLC0415 try: cv.entity_id(lookup_value) @@ -1432,7 +1430,7 @@ def device_name(hass: HomeAssistant, lookup_value: str) -> str | None: def device_attr(hass: HomeAssistant, device_or_entity_id: str, attr_name: str) -> Any: """Get the device specific attribute.""" - device_reg = device_registry.async_get(hass) + device_reg = dr.async_get(hass) if not isinstance(device_or_entity_id, str): raise TemplateError("Must provide a device or entity ID") device = None @@ -1475,14 +1473,14 @@ def is_device_attr( def issues(hass: HomeAssistant) -> dict[tuple[str, str], dict[str, Any]]: """Return all open issues.""" - current_issues = issue_registry.async_get(hass).issues + current_issues = ir.async_get(hass).issues # Use JSON for safe representation return {k: v.to_json() for (k, v) in current_issues.items()} def issue(hass: HomeAssistant, domain: str, issue_id: str) -> dict[str, Any] | None: """Get issue by domain and issue_id.""" - result = issue_registry.async_get(hass).async_get_issue(domain, issue_id) + result = ir.async_get(hass).async_get_issue(domain, issue_id) if result: return result.to_json() return None @@ -1505,7 +1503,7 @@ def floor_id(hass: HomeAssistant, lookup_value: Any) -> str | None: return floors_list[0].floor_id if aid := area_id(hass, lookup_value): - area_reg = area_registry.async_get(hass) + area_reg = ar.async_get(hass) if area := area_reg.async_get_area(aid): return area.floor_id @@ -1519,7 +1517,7 @@ def floor_name(hass: HomeAssistant, lookup_value: str) -> str | None: return floor.name if aid := area_id(hass, lookup_value): - area_reg = area_registry.async_get(hass) + area_reg = ar.async_get(hass) if ( (area := area_reg.async_get_area(aid)) and area.floor_id @@ -1542,8 +1540,8 @@ def floor_areas(hass: HomeAssistant, floor_id_or_name: str) -> Iterable[str]: if _floor_id is None: return [] - area_reg = area_registry.async_get(hass) - entries = area_registry.async_entries_for_floor(area_reg, _floor_id) + area_reg = ar.async_get(hass) + entries = ar.async_entries_for_floor(area_reg, _floor_id) return [entry.id for entry in entries if entry.id] @@ -1558,12 +1556,12 @@ def floor_entities(hass: HomeAssistant, floor_id_or_name: str) -> Iterable[str]: def areas(hass: HomeAssistant) -> Iterable[str | None]: """Return all areas.""" - return list(area_registry.async_get(hass).areas) + return list(ar.async_get(hass).areas) def area_id(hass: HomeAssistant, lookup_value: str) -> str | None: """Get the area ID from an area name, alias, device id, or entity id.""" - area_reg = area_registry.async_get(hass) + area_reg = ar.async_get(hass) lookup_str = str(lookup_value) if area := area_reg.async_get_area_by_name(lookup_str): return area.id @@ -1571,10 +1569,10 @@ def area_id(hass: HomeAssistant, lookup_value: str) -> str | None: if areas_list: return areas_list[0].id - ent_reg = entity_registry.async_get(hass) - dev_reg = device_registry.async_get(hass) + ent_reg = er.async_get(hass) + dev_reg = dr.async_get(hass) # Import here, not at top-level to avoid circular import - from . import config_validation as cv # noqa: PLC0415 + from homeassistant.helpers import config_validation as cv # noqa: PLC0415 try: cv.entity_id(lookup_value) @@ -1596,7 +1594,7 @@ def area_id(hass: HomeAssistant, lookup_value: str) -> str | None: return None -def _get_area_name(area_reg: area_registry.AreaRegistry, valid_area_id: str) -> str: +def _get_area_name(area_reg: ar.AreaRegistry, valid_area_id: str) -> str: """Get area name from valid area ID.""" area = area_reg.async_get_area(valid_area_id) assert area @@ -1605,14 +1603,14 @@ def _get_area_name(area_reg: area_registry.AreaRegistry, valid_area_id: str) -> def area_name(hass: HomeAssistant, lookup_value: str) -> str | None: """Get the area name from an area id, device id, or entity id.""" - area_reg = area_registry.async_get(hass) + area_reg = ar.async_get(hass) if area := area_reg.async_get_area(lookup_value): return area.name - dev_reg = device_registry.async_get(hass) - ent_reg = entity_registry.async_get(hass) + dev_reg = dr.async_get(hass) + ent_reg = er.async_get(hass) # Import here, not at top-level to avoid circular import - from . import config_validation as cv # noqa: PLC0415 + from homeassistant.helpers import config_validation as cv # noqa: PLC0415 try: cv.entity_id(lookup_value) @@ -1649,19 +1647,18 @@ def area_entities(hass: HomeAssistant, area_id_or_name: str) -> Iterable[str]: _area_id = area_id_or_name if _area_id is None: return [] - ent_reg = entity_registry.async_get(hass) + ent_reg = er.async_get(hass) entity_ids = [ - entry.entity_id - for entry in entity_registry.async_entries_for_area(ent_reg, _area_id) + entry.entity_id for entry in er.async_entries_for_area(ent_reg, _area_id) ] - dev_reg = device_registry.async_get(hass) + dev_reg = dr.async_get(hass) # We also need to add entities tied to a device in the area that don't themselves # have an area specified since they inherit the area from the device. entity_ids.extend( [ entity.entity_id - for device in device_registry.async_entries_for_area(dev_reg, _area_id) - for entity in entity_registry.async_entries_for_device(ent_reg, device.id) + for device in dr.async_entries_for_area(dev_reg, _area_id) + for entity in er.async_entries_for_device(ent_reg, device.id) if entity.area_id is None ] ) @@ -1679,21 +1676,21 @@ def area_devices(hass: HomeAssistant, area_id_or_name: str) -> Iterable[str]: _area_id = area_id(hass, area_id_or_name) if _area_id is None: return [] - dev_reg = device_registry.async_get(hass) - entries = device_registry.async_entries_for_area(dev_reg, _area_id) + dev_reg = dr.async_get(hass) + entries = dr.async_entries_for_area(dev_reg, _area_id) return [entry.id for entry in entries] def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None]: """Return all labels, or those from a area ID, device ID, or entity ID.""" - label_reg = label_registry.async_get(hass) + label_reg = lr.async_get(hass) if lookup_value is None: return list(label_reg.labels) - ent_reg = entity_registry.async_get(hass) + ent_reg = er.async_get(hass) # Import here, not at top-level to avoid circular import - from . import config_validation as cv # noqa: PLC0415 + from homeassistant.helpers import config_validation as cv # noqa: PLC0415 lookup_value = str(lookup_value) @@ -1706,12 +1703,12 @@ def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None return list(entity.labels) # Check if this could be a device ID - dev_reg = device_registry.async_get(hass) + dev_reg = dr.async_get(hass) if device := dev_reg.async_get(lookup_value): return list(device.labels) # Check if this could be a area ID - area_reg = area_registry.async_get(hass) + area_reg = ar.async_get(hass) if area := area_reg.async_get_area(lookup_value): return list(area.labels) @@ -1720,7 +1717,7 @@ def labels(hass: HomeAssistant, lookup_value: Any = None) -> Iterable[str | None def label_id(hass: HomeAssistant, lookup_value: Any) -> str | None: """Get the label ID from a label name.""" - label_reg = label_registry.async_get(hass) + label_reg = lr.async_get(hass) if label := label_reg.async_get_label_by_name(str(lookup_value)): return label.label_id return None @@ -1728,7 +1725,7 @@ def label_id(hass: HomeAssistant, lookup_value: Any) -> str | None: def label_name(hass: HomeAssistant, lookup_value: str) -> str | None: """Get the label name from a label ID.""" - label_reg = label_registry.async_get(hass) + label_reg = lr.async_get(hass) if label := label_reg.async_get_label(lookup_value): return label.name return None @@ -1736,7 +1733,7 @@ def label_name(hass: HomeAssistant, lookup_value: str) -> str | None: def label_description(hass: HomeAssistant, lookup_value: str) -> str | None: """Get the label description from a label ID.""" - label_reg = label_registry.async_get(hass) + label_reg = lr.async_get(hass) if label := label_reg.async_get_label(lookup_value): return label.description return None @@ -1755,8 +1752,8 @@ def label_areas(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]: """Return areas for a given label ID or name.""" if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: return [] - area_reg = area_registry.async_get(hass) - entries = area_registry.async_entries_for_label(area_reg, _label_id) + area_reg = ar.async_get(hass) + entries = ar.async_entries_for_label(area_reg, _label_id) return [entry.id for entry in entries] @@ -1764,8 +1761,8 @@ def label_devices(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]: """Return device IDs for a given label ID or name.""" if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: return [] - dev_reg = device_registry.async_get(hass) - entries = device_registry.async_entries_for_label(dev_reg, _label_id) + dev_reg = dr.async_get(hass) + entries = dr.async_entries_for_label(dev_reg, _label_id) return [entry.id for entry in entries] @@ -1773,8 +1770,8 @@ def label_entities(hass: HomeAssistant, label_id_or_name: str) -> Iterable[str]: """Return entities for a given label ID or name.""" if (_label_id := _label_id_or_name(hass, label_id_or_name)) is None: return [] - ent_reg = entity_registry.async_get(hass) - entries = entity_registry.async_entries_for_label(ent_reg, _label_id) + ent_reg = er.async_get(hass) + entries = er.async_entries_for_label(ent_reg, _label_id) return [entry.entity_id for entry in entries] @@ -1913,7 +1910,7 @@ def distance(hass: HomeAssistant, *args: Any) -> float | None: def is_hidden_entity(hass: HomeAssistant, entity_id: str) -> bool: """Test if an entity is hidden.""" - entity_reg = entity_registry.async_get(hass) + entity_reg = er.async_get(hass) entry = entity_reg.async_get(entity_id) return entry is not None and entry.hidden @@ -2608,22 +2605,6 @@ def from_hex(value: str) -> bytes: return bytes.fromhex(value) -def base64_encode(value: str | bytes) -> str: - """Perform base64 encode.""" - if isinstance(value, str): - value = value.encode("utf-8") - return base64.b64encode(value).decode("utf-8") - - -def base64_decode(value: str, encoding: str | None = "utf-8") -> str | bytes: - """Perform base64 decode.""" - decoded = base64.b64decode(value) - if encoding: - return decoded.decode(encoding) - - return decoded - - def ordinal(value): """Perform ordinal conversion.""" suffixes = ["th", "st", "nd", "rd"] + ["th"] * 6 # codespell:ignore nd @@ -2928,26 +2909,6 @@ def combine(*args: Any, recursive: bool = False) -> dict[Any, Any]: return result -def md5(value: str) -> str: - """Generate md5 hash from a string.""" - return hashlib.md5(value.encode()).hexdigest() - - -def sha1(value: str) -> str: - """Generate sha1 hash from a string.""" - return hashlib.sha1(value.encode()).hexdigest() - - -def sha256(value: str) -> str: - """Generate sha256 hash from a string.""" - return hashlib.sha256(value.encode()).hexdigest() - - -def sha512(value: str) -> str: - """Generate sha512 hash from a string.""" - return hashlib.sha512(value.encode()).hexdigest() - - class TemplateContextManager(AbstractContextManager): """Context manager to store template being parsed or rendered in a ContextVar.""" @@ -3096,11 +3057,14 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment): """Initialise template environment.""" super().__init__(undefined=make_logging_undefined(strict, log_fn)) self.hass = hass + self.limited = limited self.template_cache: weakref.WeakValueDictionary[ str | jinja2.nodes.Template, CodeType | None ] = weakref.WeakValueDictionary() self.add_extension("jinja2.ext.loopcontrols") self.add_extension("jinja2.ext.do") + self.add_extension("homeassistant.helpers.template.extensions.Base64Extension") + self.add_extension("homeassistant.helpers.template.extensions.CryptoExtension") self.globals["acos"] = arc_cosine self.globals["as_datetime"] = as_datetime @@ -3125,16 +3089,12 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment): self.globals["is_number"] = is_number self.globals["log"] = logarithm self.globals["max"] = min_max_from_filter(self.filters["max"], "max") - self.globals["md5"] = md5 self.globals["median"] = median self.globals["merge_response"] = merge_response self.globals["min"] = min_max_from_filter(self.filters["min"], "min") self.globals["pack"] = struct_pack self.globals["pi"] = math.pi self.globals["set"] = _to_set - self.globals["sha1"] = sha1 - self.globals["sha256"] = sha256 - self.globals["sha512"] = sha512 self.globals["shuffle"] = shuffle self.globals["sin"] = sine self.globals["slugify"] = slugify @@ -3165,8 +3125,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment): self.filters["atan"] = arc_tangent self.filters["atan2"] = arc_tangent2 self.filters["average"] = average - self.filters["base64_decode"] = base64_decode - self.filters["base64_encode"] = base64_encode self.filters["bitwise_and"] = bitwise_and self.filters["bitwise_or"] = bitwise_or self.filters["bitwise_xor"] = bitwise_xor @@ -3185,7 +3143,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment): self.filters["is_defined"] = fail_when_undefined self.filters["is_number"] = is_number self.filters["log"] = logarithm - self.filters["md5"] = md5 self.filters["median"] = median self.filters["multiply"] = multiply self.filters["ord"] = ord @@ -3198,9 +3155,6 @@ class TemplateEnvironment(ImmutableSandboxedEnvironment): self.filters["regex_replace"] = regex_replace self.filters["regex_search"] = regex_search self.filters["round"] = forgiving_round - self.filters["sha1"] = sha1 - self.filters["sha256"] = sha256 - self.filters["sha512"] = sha512 self.filters["shuffle"] = shuffle self.filters["sin"] = sine self.filters["slugify"] = slugify diff --git a/homeassistant/helpers/template/extensions/__init__.py b/homeassistant/helpers/template/extensions/__init__.py new file mode 100644 index 00000000000..d1ed7e093fa --- /dev/null +++ b/homeassistant/helpers/template/extensions/__init__.py @@ -0,0 +1,6 @@ +"""Home Assistant template extensions.""" + +from .base64 import Base64Extension +from .crypto import CryptoExtension + +__all__ = ["Base64Extension", "CryptoExtension"] diff --git a/homeassistant/helpers/template/extensions/base.py b/homeassistant/helpers/template/extensions/base.py new file mode 100644 index 00000000000..142e9e77d5e --- /dev/null +++ b/homeassistant/helpers/template/extensions/base.py @@ -0,0 +1,60 @@ +"""Base extension class for Home Assistant template extensions.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from jinja2.ext import Extension +from jinja2.nodes import Node +from jinja2.parser import Parser + +if TYPE_CHECKING: + from homeassistant.helpers.template import TemplateEnvironment + + +@dataclass +class TemplateFunction: + """Definition for a template function, filter, or test.""" + + name: str + func: Callable[..., Any] + as_global: bool = False + as_filter: bool = False + as_test: bool = False + limited_ok: bool = ( + True # Whether this function is available in limited environments + ) + + +class BaseTemplateExtension(Extension): + """Base class for Home Assistant template extensions.""" + + environment: TemplateEnvironment + + def __init__( + self, + environment: TemplateEnvironment, + *, + functions: list[TemplateFunction] | None = None, + ) -> None: + """Initialize the extension with a list of template functions.""" + super().__init__(environment) + + if functions: + for template_func in functions: + # Skip functions not allowed in limited environments + if self.environment.limited and not template_func.limited_ok: + continue + + if template_func.as_global: + environment.globals[template_func.name] = template_func.func + if template_func.as_filter: + environment.filters[template_func.name] = template_func.func + if template_func.as_test: + environment.tests[template_func.name] = template_func.func + + def parse(self, parser: Parser) -> Node | list[Node]: + """Required by Jinja2 Extension base class.""" + return [] diff --git a/homeassistant/helpers/template/extensions/base64.py b/homeassistant/helpers/template/extensions/base64.py new file mode 100644 index 00000000000..3ec88bf14f4 --- /dev/null +++ b/homeassistant/helpers/template/extensions/base64.py @@ -0,0 +1,50 @@ +"""Base64 encoding and decoding functions for Home Assistant templates.""" + +from __future__ import annotations + +import base64 +from typing import TYPE_CHECKING + +from .base import BaseTemplateExtension, TemplateFunction + +if TYPE_CHECKING: + from homeassistant.helpers.template import TemplateEnvironment + + +class Base64Extension(BaseTemplateExtension): + """Jinja2 extension for base64 encoding and decoding functions.""" + + def __init__(self, environment: TemplateEnvironment) -> None: + """Initialize the base64 extension.""" + super().__init__( + environment, + functions=[ + TemplateFunction( + "base64_encode", + self.base64_encode, + as_filter=True, + limited_ok=False, + ), + TemplateFunction( + "base64_decode", + self.base64_decode, + as_filter=True, + limited_ok=False, + ), + ], + ) + + @staticmethod + def base64_encode(value: str | bytes) -> str: + """Encode a string or bytes to base64.""" + if isinstance(value, str): + value = value.encode("utf-8") + return base64.b64encode(value).decode("utf-8") + + @staticmethod + def base64_decode(value: str, encoding: str | None = "utf-8") -> str | bytes: + """Decode a base64 string.""" + decoded = base64.b64decode(value) + if encoding is None: + return decoded + return decoded.decode(encoding) diff --git a/homeassistant/helpers/template/extensions/crypto.py b/homeassistant/helpers/template/extensions/crypto.py new file mode 100644 index 00000000000..c3ff165d727 --- /dev/null +++ b/homeassistant/helpers/template/extensions/crypto.py @@ -0,0 +1,64 @@ +"""Cryptographic hash functions for Home Assistant templates.""" + +from __future__ import annotations + +import hashlib +from typing import TYPE_CHECKING + +from .base import BaseTemplateExtension, TemplateFunction + +if TYPE_CHECKING: + from homeassistant.helpers.template import TemplateEnvironment + + +class CryptoExtension(BaseTemplateExtension): + """Jinja2 extension for cryptographic hash functions.""" + + def __init__(self, environment: TemplateEnvironment) -> None: + """Initialize the crypto extension.""" + super().__init__( + environment, + functions=[ + # Hash functions (as globals and filters) + TemplateFunction( + "md5", self.md5, as_global=True, as_filter=True, limited_ok=False + ), + TemplateFunction( + "sha1", self.sha1, as_global=True, as_filter=True, limited_ok=False + ), + TemplateFunction( + "sha256", + self.sha256, + as_global=True, + as_filter=True, + limited_ok=False, + ), + TemplateFunction( + "sha512", + self.sha512, + as_global=True, + as_filter=True, + limited_ok=False, + ), + ], + ) + + @staticmethod + def md5(value: str) -> str: + """Generate md5 hash from a string.""" + return hashlib.md5(value.encode()).hexdigest() + + @staticmethod + def sha1(value: str) -> str: + """Generate sha1 hash from a string.""" + return hashlib.sha1(value.encode()).hexdigest() + + @staticmethod + def sha256(value: str) -> str: + """Generate sha256 hash from a string.""" + return hashlib.sha256(value.encode()).hexdigest() + + @staticmethod + def sha512(value: str) -> str: + """Generate sha512 hash from a string.""" + return hashlib.sha512(value.encode()).hexdigest() diff --git a/tests/helpers/template/__init__.py b/tests/helpers/template/__init__.py new file mode 100644 index 00000000000..f1e980fd2fb --- /dev/null +++ b/tests/helpers/template/__init__.py @@ -0,0 +1 @@ +"""Tests for Home Assistant template engine.""" diff --git a/tests/helpers/template/extensions/__init__.py b/tests/helpers/template/extensions/__init__.py new file mode 100644 index 00000000000..43b7c1caccf --- /dev/null +++ b/tests/helpers/template/extensions/__init__.py @@ -0,0 +1 @@ +"""Tests for Home Assistant template extensions.""" diff --git a/tests/helpers/template/extensions/test_base64.py b/tests/helpers/template/extensions/test_base64.py new file mode 100644 index 00000000000..b0c1fb35134 --- /dev/null +++ b/tests/helpers/template/extensions/test_base64.py @@ -0,0 +1,43 @@ +"""Test base64 encoding and decoding functions for Home Assistant templates.""" + +from __future__ import annotations + +import pytest + +from homeassistant.core import HomeAssistant +from homeassistant.helpers import template + + +@pytest.mark.parametrize( + ("value_template", "expected"), + [ + ('{{ "homeassistant" | base64_encode }}', "aG9tZWFzc2lzdGFudA=="), + ("{{ int('0F010003', base=16) | pack('>I') | base64_encode }}", "DwEAAw=="), + ("{{ 'AA01000200150020' | from_hex | base64_encode }}", "qgEAAgAVACA="), + ], +) +def test_base64_encode(hass: HomeAssistant, value_template: str, expected: str) -> None: + """Test the base64_encode filter.""" + assert template.Template(value_template, hass).async_render() == expected + + +def test_base64_decode(hass: HomeAssistant) -> None: + """Test the base64_decode filter.""" + assert ( + template.Template( + '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode }}', hass + ).async_render() + == "homeassistant" + ) + assert ( + template.Template( + '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode(None) }}', hass + ).async_render() + == b"homeassistant" + ) + assert ( + template.Template( + '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode("ascii") }}', hass + ).async_render() + == "homeassistant" + ) diff --git a/tests/helpers/template/extensions/test_crypto.py b/tests/helpers/template/extensions/test_crypto.py new file mode 100644 index 00000000000..f1e4c3b39cc --- /dev/null +++ b/tests/helpers/template/extensions/test_crypto.py @@ -0,0 +1,58 @@ +"""Test cryptographic hash functions for Home Assistant templates.""" + +from __future__ import annotations + +from homeassistant.core import HomeAssistant +from homeassistant.helpers import template + + +def test_md5(hass: HomeAssistant) -> None: + """Test the md5 function and filter.""" + assert ( + template.Template("{{ md5('Home Assistant') }}", hass).async_render() + == "3d15e5c102c3413d0337393c3287e006" + ) + + assert ( + template.Template("{{ 'Home Assistant' | md5 }}", hass).async_render() + == "3d15e5c102c3413d0337393c3287e006" + ) + + +def test_sha1(hass: HomeAssistant) -> None: + """Test the sha1 function and filter.""" + assert ( + template.Template("{{ sha1('Home Assistant') }}", hass).async_render() + == "c8fd3bb19b94312664faa619af7729bdbf6e9f8a" + ) + + assert ( + template.Template("{{ 'Home Assistant' | sha1 }}", hass).async_render() + == "c8fd3bb19b94312664faa619af7729bdbf6e9f8a" + ) + + +def test_sha256(hass: HomeAssistant) -> None: + """Test the sha256 function and filter.""" + assert ( + template.Template("{{ sha256('Home Assistant') }}", hass).async_render() + == "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b" + ) + + assert ( + template.Template("{{ 'Home Assistant' | sha256 }}", hass).async_render() + == "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b" + ) + + +def test_sha512(hass: HomeAssistant) -> None: + """Test the sha512 function and filter.""" + assert ( + template.Template("{{ sha512('Home Assistant') }}", hass).async_render() + == "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb" + ) + + assert ( + template.Template("{{ 'Home Assistant' | sha512 }}", hass).async_render() + == "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb" + ) diff --git a/tests/helpers/snapshots/test_template.ambr b/tests/helpers/template/snapshots/test_init.ambr similarity index 100% rename from tests/helpers/snapshots/test_template.ambr rename to tests/helpers/template/snapshots/test_init.ambr diff --git a/tests/helpers/test_template.py b/tests/helpers/template/test_init.py similarity index 98% rename from tests/helpers/test_template.py rename to tests/helpers/template/test_init.py index 85a2673f17d..6d4c27123fc 100644 --- a/tests/helpers/test_template.py +++ b/tests/helpers/template/test_init.py @@ -1739,41 +1739,6 @@ def test_from_hex(hass: HomeAssistant) -> None: ) -@pytest.mark.parametrize( - ("value_template", "expected"), - [ - ('{{ "homeassistant" | base64_encode }}', "aG9tZWFzc2lzdGFudA=="), - ("{{ int('0F010003', base=16) | pack('>I') | base64_encode }}", "DwEAAw=="), - ("{{ 'AA01000200150020' | from_hex | base64_encode }}", "qgEAAgAVACA="), - ], -) -def test_base64_encode(hass: HomeAssistant, value_template: str, expected: str) -> None: - """Test the base64_encode filter.""" - assert template.Template(value_template, hass).async_render() == expected - - -def test_base64_decode(hass: HomeAssistant) -> None: - """Test the base64_decode filter.""" - assert ( - template.Template( - '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode }}', hass - ).async_render() - == "homeassistant" - ) - assert ( - template.Template( - '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode(None) }}', hass - ).async_render() - == b"homeassistant" - ) - assert ( - template.Template( - '{{ "aG9tZWFzc2lzdGFudA==" | base64_decode("ascii") }}', hass - ).async_render() - == "homeassistant" - ) - - def test_slugify(hass: HomeAssistant) -> None: """Test the slugify filter.""" assert ( @@ -7174,58 +7139,6 @@ def test_symmetric_difference(hass: HomeAssistant) -> None: ).async_render() -def test_md5(hass: HomeAssistant) -> None: - """Test the md5 function and filter.""" - assert ( - template.Template("{{ md5('Home Assistant') }}", hass).async_render() - == "3d15e5c102c3413d0337393c3287e006" - ) - - assert ( - template.Template("{{ 'Home Assistant' | md5 }}", hass).async_render() - == "3d15e5c102c3413d0337393c3287e006" - ) - - -def test_sha1(hass: HomeAssistant) -> None: - """Test the sha1 function and filter.""" - assert ( - template.Template("{{ sha1('Home Assistant') }}", hass).async_render() - == "c8fd3bb19b94312664faa619af7729bdbf6e9f8a" - ) - - assert ( - template.Template("{{ 'Home Assistant' | sha1 }}", hass).async_render() - == "c8fd3bb19b94312664faa619af7729bdbf6e9f8a" - ) - - -def test_sha256(hass: HomeAssistant) -> None: - """Test the sha256 function and filter.""" - assert ( - template.Template("{{ sha256('Home Assistant') }}", hass).async_render() - == "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b" - ) - - assert ( - template.Template("{{ 'Home Assistant' | sha256 }}", hass).async_render() - == "2a366abb0cd47f51f3725bf0fb7ebcb4fefa6e20f4971e25fe2bb8da8145ce2b" - ) - - -def test_sha512(hass: HomeAssistant) -> None: - """Test the sha512 function and filter.""" - assert ( - template.Template("{{ sha512('Home Assistant') }}", hass).async_render() - == "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb" - ) - - assert ( - template.Template("{{ 'Home Assistant' | sha512 }}", hass).async_render() - == "9e3c2cdd1fbab0037378d37e1baf8a3a4bf92c54b56ad1d459deee30ccbb2acbebd7a3614552ea08992ad27dedeb7b4c5473525ba90cb73dbe8b9ec5f69295bb" - ) - - def test_combine(hass: HomeAssistant) -> None: """Test combine filter and function.""" assert template.Template(