From 0335c9e32b66cb68e15feaf1eacbd84aaa646a9e Mon Sep 17 00:00:00 2001 From: abmantis Date: Thu, 3 Jul 2025 22:59:32 +0100 Subject: [PATCH] Use common method in triggers.py --- homeassistant/helpers/trigger.py | 235 +------------------------------ 1 file changed, 3 insertions(+), 232 deletions(-) diff --git a/homeassistant/helpers/trigger.py b/homeassistant/helpers/trigger.py index 370954de744..e3dc4fd5753 100644 --- a/homeassistant/helpers/trigger.py +++ b/homeassistant/helpers/trigger.py @@ -6,26 +6,19 @@ import abc import asyncio from collections import defaultdict from collections.abc import Callable, Coroutine, Iterable -import dataclasses from dataclasses import dataclass, field import functools import logging -from typing import TYPE_CHECKING, Any, Protocol, TypedDict, TypeGuard, cast +from typing import TYPE_CHECKING, Any, Protocol, TypedDict, cast import voluptuous as vol from homeassistant.const import ( - ATTR_AREA_ID, - ATTR_DEVICE_ID, - ATTR_ENTITY_ID, - ATTR_FLOOR_ID, - ATTR_LABEL_ID, CONF_ALIAS, CONF_ENABLED, CONF_ID, CONF_PLATFORM, CONF_VARIABLES, - ENTITY_MATCH_NONE, ) from homeassistant.core import ( CALLBACK_TYPE, @@ -49,17 +42,10 @@ from homeassistant.util.hass_dict import HassKey from homeassistant.util.yaml import load_yaml_dict from homeassistant.util.yaml.loader import JSON_TYPE -from . import ( - area_registry, - config_validation as cv, - device_registry, - entity_registry, - floor_registry, - label_registry, -) +from . import area_registry, config_validation as cv, device_registry, entity_registry from .event import EventStateChangedData, async_track_state_change_event -from .group import expand_entity_ids from .integration_platform import async_process_integration_platforms +from .selector import TargetSelectorData, async_extract_referenced_entity_ids from .template import Template from .typing import ConfigType, TemplateVarsType @@ -637,221 +623,6 @@ async def async_get_all_descriptions( return new_descriptions_cache -def _has_match(ids: str | list[str] | None) -> TypeGuard[str | list[str]]: - """Check if ids can match anything.""" - return ids not in (None, ENTITY_MATCH_NONE) - - -# TODO(abmantis): Since this is a copy from the service one, move it to a common place and use it in both places -class TargetSelectorData: - """Class to hold data of target selector.""" - - __slots__ = ("area_ids", "device_ids", "entity_ids", "floor_ids", "label_ids") - - def __init__(self, config: ConfigType) -> None: - """Extract ids from the config.""" - entity_ids: str | list | None = config.get(ATTR_ENTITY_ID) - device_ids: str | list | None = config.get(ATTR_DEVICE_ID) - area_ids: str | list | None = config.get(ATTR_AREA_ID) - floor_ids: str | list | None = config.get(ATTR_FLOOR_ID) - label_ids: str | list | None = config.get(ATTR_LABEL_ID) - - self.entity_ids = ( - set(cv.ensure_list(entity_ids)) if _has_match(entity_ids) else set() - ) - self.device_ids = ( - set(cv.ensure_list(device_ids)) if _has_match(device_ids) else set() - ) - self.area_ids = set(cv.ensure_list(area_ids)) if _has_match(area_ids) else set() - self.floor_ids = ( - set(cv.ensure_list(floor_ids)) if _has_match(floor_ids) else set() - ) - self.label_ids = ( - set(cv.ensure_list(label_ids)) if _has_match(label_ids) else set() - ) - - @property - def has_any_selector(self) -> bool: - """Determine if any selectors are present.""" - return bool( - self.entity_ids - or self.device_ids - or self.area_ids - or self.floor_ids - or self.label_ids - ) - - -# TODO(abmantis): Since this is a copy from the service one, move it to a common place and use it in both places -@dataclasses.dataclass(slots=True) -class SelectedEntities: - """Class to hold the selected entities.""" - - # Entities that were explicitly mentioned. - referenced: set[str] = dataclasses.field(default_factory=set) - - # Entities that were referenced via device/area/floor/label ID. - # Should not trigger a warning when they don't exist. - indirectly_referenced: set[str] = dataclasses.field(default_factory=set) - - # Referenced items that could not be found. - missing_devices: set[str] = dataclasses.field(default_factory=set) - missing_areas: set[str] = dataclasses.field(default_factory=set) - missing_floors: set[str] = dataclasses.field(default_factory=set) - missing_labels: set[str] = dataclasses.field(default_factory=set) - - referenced_devices: set[str] = dataclasses.field(default_factory=set) - referenced_areas: set[str] = dataclasses.field(default_factory=set) - - def log_missing(self, missing_entities: set[str]) -> None: - """Log about missing items.""" - parts = [] - for label, items in ( - ("floors", self.missing_floors), - ("areas", self.missing_areas), - ("devices", self.missing_devices), - ("entities", missing_entities), - ("labels", self.missing_labels), - ): - if items: - parts.append(f"{label} {', '.join(sorted(items))}") - - if not parts: - return - - _LOGGER.warning( - "Referenced %s are missing or not currently available", - ", ".join(parts), - ) - - -# TODO(abmantis): Since this is a copy from the service one, move it to a common place and use it in both places -def async_extract_referenced_entity_ids( - hass: HomeAssistant, selector_data: TargetSelectorData, expand_group: bool = True -) -> SelectedEntities: - """Extract referenced entity IDs from a target selector.""" - selected = SelectedEntities() - - if not selector_data.has_any_selector: - return selected - - entity_ids: set[str] | list[str] = selector_data.entity_ids - if expand_group: - entity_ids = expand_entity_ids(hass, entity_ids) - - selected.referenced.update(entity_ids) - - if ( - not selector_data.device_ids - and not selector_data.area_ids - and not selector_data.floor_ids - and not selector_data.label_ids - ): - return selected - - entities = entity_registry.async_get(hass).entities - dev_reg = device_registry.async_get(hass) - area_reg = area_registry.async_get(hass) - - if selector_data.floor_ids: - floor_reg = floor_registry.async_get(hass) - for floor_id in selector_data.floor_ids: - if floor_id not in floor_reg.floors: - selected.missing_floors.add(floor_id) - - for area_id in selector_data.area_ids: - if area_id not in area_reg.areas: - selected.missing_areas.add(area_id) - - for device_id in selector_data.device_ids: - if device_id not in dev_reg.devices: - selected.missing_devices.add(device_id) - - if selector_data.label_ids: - label_reg = label_registry.async_get(hass) - for label_id in selector_data.label_ids: - if label_id not in label_reg.labels: - selected.missing_labels.add(label_id) - - for entity_entry in entities.get_entries_for_label(label_id): - if ( - entity_entry.entity_category is None - and entity_entry.hidden_by is None - ): - selected.indirectly_referenced.add(entity_entry.entity_id) - - for device_entry in dev_reg.devices.get_devices_for_label(label_id): - selected.referenced_devices.add(device_entry.id) - - for area_entry in area_reg.areas.get_areas_for_label(label_id): - selected.referenced_areas.add(area_entry.id) - - # Find areas for targeted floors - if selector_data.floor_ids: - selected.referenced_areas.update( - area_entry.id - for floor_id in selector_data.floor_ids - for area_entry in area_reg.areas.get_areas_for_floor(floor_id) - ) - - selected.referenced_areas.update(selector_data.area_ids) - selected.referenced_devices.update(selector_data.device_ids) - - if not selected.referenced_areas and not selected.referenced_devices: - return selected - - # Add indirectly referenced by device - selected.indirectly_referenced.update( - entry.entity_id - for device_id in selected.referenced_devices - for entry in entities.get_entries_for_device_id(device_id) - # Do not add entities which are hidden or which are config - # or diagnostic entities. - if (entry.entity_category is None and entry.hidden_by is None) - ) - - # Find devices for targeted areas - referenced_devices_by_area: set[str] = set() - if selected.referenced_areas: - for area_id in selected.referenced_areas: - referenced_devices_by_area.update( - device_entry.id - for device_entry in dev_reg.devices.get_devices_for_area_id(area_id) - ) - selected.referenced_devices.update(referenced_devices_by_area) - - # Add indirectly referenced by area - selected.indirectly_referenced.update( - entry.entity_id - for area_id in selected.referenced_areas - # The entity's area matches a targeted area - for entry in entities.get_entries_for_area_id(area_id) - # Do not add entities which are hidden or which are config - # or diagnostic entities. - if entry.entity_category is None and entry.hidden_by is None - ) - # Add indirectly referenced by area through device - selected.indirectly_referenced.update( - entry.entity_id - for device_id in referenced_devices_by_area - for entry in entities.get_entries_for_device_id(device_id) - # Do not add entities which are hidden or which are config - # or diagnostic entities. - if ( - entry.entity_category is None - and entry.hidden_by is None - and ( - # The entity's device matches a device referenced - # by an area and the entity - # has no explicitly set area - not entry.area_id - ) - ) - ) - - return selected - - class TargetSelectorStateChangeTracker: """Helper class to manage state change tracking for target selectors."""