Refactor/fix search component, including labels & floors support (#114206)

Co-authored-by: TheJulianJES <TheJulianJES@users.noreply.github.com>
Co-authored-by: Robert Resch <robert@resch.dev>
This commit is contained in:
Franck Nijhof 2024-03-27 09:39:05 +01:00 committed by GitHub
parent d8acd90370
commit dd2d79b77e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 1226 additions and 589 deletions

View File

@ -2,7 +2,9 @@
from __future__ import annotations
from collections import defaultdict, deque
from collections import defaultdict
from collections.abc import Iterable
from enum import StrEnum
import logging
from typing import Any
@ -12,6 +14,7 @@ from homeassistant.components import automation, group, person, script, websocke
from homeassistant.components.homeassistant import scene
from homeassistant.core import HomeAssistant, callback, split_entity_id
from homeassistant.helpers import (
area_registry as ar,
config_validation as cv,
device_registry as dr,
entity_registry as er,
@ -28,6 +31,25 @@ _LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
# enum of item types
class ItemType(StrEnum):
"""Item types."""
AREA = "area"
AUTOMATION = "automation"
AUTOMATION_BLUEPRINT = "automation_blueprint"
CONFIG_ENTRY = "config_entry"
DEVICE = "device"
ENTITY = "entity"
FLOOR = "floor"
GROUP = "group"
LABEL = "label"
PERSON = "person"
SCENE = "scene"
SCRIPT = "script"
SCRIPT_BLUEPRINT = "script_blueprint"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Search component."""
websocket_api.async_register_command(hass, websocket_search_related)
@ -37,21 +59,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@websocket_api.websocket_command(
{
vol.Required("type"): "search/related",
vol.Required("item_type"): vol.In(
(
"area",
"automation",
"automation_blueprint",
"config_entry",
"device",
"entity",
"group",
"person",
"scene",
"script",
"script_blueprint",
)
),
vol.Required("item_type"): vol.Coerce(ItemType),
vol.Required("item_id"): str,
}
)
@ -62,271 +70,520 @@ def websocket_search_related(
msg: dict[str, Any],
) -> None:
"""Handle search."""
searcher = Searcher(
hass,
dr.async_get(hass),
er.async_get(hass),
get_entity_sources(hass),
)
searcher = Searcher(hass, get_entity_sources(hass))
connection.send_result(
msg["id"], searcher.async_search(msg["item_type"], msg["item_id"])
)
class Searcher:
"""Find related things.
"""Find related things."""
Few rules:
Scenes, scripts, automations and config entries will only be expanded if they are
the entry point. They won't be expanded if we process them. This is because they
turn the results into garbage.
"""
# These types won't be further explored. Config entries + Output types.
DONT_RESOLVE = {
"area",
"automation",
"automation_blueprint",
"config_entry",
"group",
"scene",
"script",
"script_blueprint",
}
# These types exist as an entity and so need cleanup in results
EXIST_AS_ENTITY = {"automation", "group", "person", "scene", "script"}
def __init__(
self,
hass: HomeAssistant,
device_reg: dr.DeviceRegistry,
entity_reg: er.EntityRegistry,
entity_sources: dict[str, EntityInfo],
) -> None:
"""Search results."""
self.hass = hass
self._device_reg = device_reg
self._entity_reg = entity_reg
self._sources = entity_sources
self.results: defaultdict[str, set[str]] = defaultdict(set)
self._to_resolve: deque[tuple[str, str]] = deque()
self._area_registry = ar.async_get(hass)
self._device_registry = dr.async_get(hass)
self._entity_registry = er.async_get(hass)
self._entity_sources = entity_sources
self.results: defaultdict[ItemType, set[str]] = defaultdict(set)
@callback
def async_search(self, item_type: str, item_id: str) -> dict[str, set[str]]:
def async_search(self, item_type: ItemType, item_id: str) -> dict[str, set[str]]:
"""Find results."""
_LOGGER.debug("Searching for %s/%s", item_type, item_id)
self.results[item_type].add(item_id)
self._to_resolve.append((item_type, item_id))
getattr(self, f"_async_search_{item_type}")(item_id)
while self._to_resolve:
search_type, search_id = self._to_resolve.popleft()
getattr(self, f"_resolve_{search_type}")(search_id)
# Clean up entity_id items, from the general "entity" type result,
# that are also found in the specific entity domain type.
for result_type in self.EXIST_AS_ENTITY:
self.results["entity"] -= self.results[result_type]
# Remove entry into graph from search results.
to_remove_item_type = item_type
if item_type == "entity":
domain = split_entity_id(item_id)[0]
if domain in self.EXIST_AS_ENTITY:
to_remove_item_type = domain
self.results[to_remove_item_type].remove(item_id)
# Remove the original requested item from the results (if present)
if item_type in self.results and item_id in self.results[item_type]:
self.results[item_type].remove(item_id)
# Filter out empty sets.
return {key: val for key, val in self.results.items() if val}
@callback
def _add_or_resolve(self, item_type: str, item_id: str) -> None:
"""Add an item to explore."""
if item_id in self.results[item_type]:
def _add(self, item_type: ItemType, item_id: str | Iterable[str] | None) -> None:
"""Add an item (or items) to the results."""
if item_id is None:
return
self.results[item_type].add(item_id)
if item_type not in self.DONT_RESOLVE:
self._to_resolve.append((item_type, item_id))
if isinstance(item_id, str):
self.results[item_type].add(item_id)
else:
self.results[item_type].update(item_id)
@callback
def _resolve_area(self, area_id: str) -> None:
"""Resolve an area."""
for device in dr.async_entries_for_area(self._device_reg, area_id):
self._add_or_resolve("device", device.id)
def _async_search_area(self, area_id: str, *, entry_point: bool = True) -> None:
"""Find results for an area."""
if not (area_entry := self._async_resolve_up_area(area_id)):
return
for entity_entry in er.async_entries_for_area(self._entity_reg, area_id):
self._add_or_resolve("entity", entity_entry.entity_id)
if entry_point:
# Add labels of this area
self._add(ItemType.LABEL, area_entry.labels)
for entity_id in script.scripts_with_area(self.hass, area_id):
self._add_or_resolve("entity", entity_id)
# Automations referencing this area
self._add(
ItemType.AUTOMATION, automation.automations_with_area(self.hass, area_id)
)
for entity_id in automation.automations_with_area(self.hass, area_id):
self._add_or_resolve("entity", entity_id)
# Scripts referencing this area
self._add(ItemType.SCRIPT, script.scripts_with_area(self.hass, area_id))
# Devices in this area
for device in dr.async_entries_for_area(self._device_registry, area_id):
self._add(ItemType.DEVICE, device.id)
# Config entries for devices in this area
if device_entry := self._device_registry.async_get(device.id):
self._add(ItemType.CONFIG_ENTRY, device_entry.config_entries)
# Automations referencing this device
self._add(
ItemType.AUTOMATION,
automation.automations_with_device(self.hass, device.id),
)
# Scripts referencing this device
self._add(ItemType.SCRIPT, script.scripts_with_device(self.hass, device.id))
# Entities of this device
for entity_entry in er.async_entries_for_device(
self._entity_registry, device.id
):
# Skip the entity if it's in a different area
if entity_entry.area_id is not None:
continue
self._add(ItemType.ENTITY, entity_entry.entity_id)
# Entities in this area
for entity_entry in er.async_entries_for_area(self._entity_registry, area_id):
self._add(ItemType.ENTITY, entity_entry.entity_id)
# If this entity also exists as a resource, we add it.
if entity_entry.domain in self.EXIST_AS_ENTITY:
self._add(ItemType(entity_entry.domain), entity_entry.entity_id)
# Automations referencing this entity
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, entity_entry.entity_id),
)
# Scripts referencing this entity
self._add(
ItemType.SCRIPT,
script.scripts_with_entity(self.hass, entity_entry.entity_id),
)
# Groups that have this entity as a member
self._add(
ItemType.GROUP,
group.groups_with_entity(self.hass, entity_entry.entity_id),
)
# Persons that use this entity
self._add(
ItemType.PERSON,
person.persons_with_entity(self.hass, entity_entry.entity_id),
)
# Scenes that reference this entity
self._add(
ItemType.SCENE,
scene.scenes_with_entity(self.hass, entity_entry.entity_id),
)
# Config entries for entities in this area
self._add(ItemType.CONFIG_ENTRY, entity_entry.config_entry_id)
@callback
def _resolve_automation(self, automation_entity_id: str) -> None:
"""Resolve an automation.
def _async_search_automation(self, automation_entity_id: str) -> None:
"""Find results for an automation."""
# Up resolve the automation entity itself
if entity_entry := self._async_resolve_up_entity(automation_entity_id):
# Add labels of this automation entity
self._add(ItemType.LABEL, entity_entry.labels)
Will only be called if automation is an entry point.
"""
for entity in automation.entities_in_automation(
self.hass, automation_entity_id
):
self._add_or_resolve("entity", entity)
# Find the blueprint used in this automation
self._add(
ItemType.AUTOMATION_BLUEPRINT,
automation.blueprint_in_automation(self.hass, automation_entity_id),
)
for device in automation.devices_in_automation(self.hass, automation_entity_id):
self._add_or_resolve("device", device)
# Floors referenced in this automation
self._add(
ItemType.FLOOR,
automation.floors_in_automation(self.hass, automation_entity_id),
)
# Areas referenced in this automation
for area in automation.areas_in_automation(self.hass, automation_entity_id):
self._add_or_resolve("area", area)
self._add(ItemType.AREA, area)
self._async_resolve_up_area(area)
if blueprint := automation.blueprint_in_automation(
# Devices referenced in this automation
for device in automation.devices_in_automation(self.hass, automation_entity_id):
self._add(ItemType.DEVICE, device)
self._async_resolve_up_device(device)
# Entities referenced in this automation
for entity_id in automation.entities_in_automation(
self.hass, automation_entity_id
):
self._add_or_resolve("automation_blueprint", blueprint)
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_id)
# For an automation, we want to unwrap the groups, to ensure we
# relate this automation to all those members as well.
if domain == "group":
for group_entity_id in group.get_entity_ids(self.hass, entity_id):
self._add(ItemType.ENTITY, group_entity_id)
self._async_resolve_up_entity(group_entity_id)
# For an automation, we want to unwrap the scenes, to ensure we
# relate this automation to all referenced entities as well.
if domain == "scene":
for scene_entity_id in scene.entities_in_scene(self.hass, entity_id):
self._add(ItemType.ENTITY, scene_entity_id)
self._async_resolve_up_entity(scene_entity_id)
# Fully search the script if it is part of an automation.
# This makes the automation return all results of the embedded script.
if domain == "script":
self._async_search_script(entity_id, entry_point=False)
@callback
def _resolve_automation_blueprint(self, blueprint_path: str) -> None:
"""Resolve an automation blueprint.
Will only be called if blueprint is an entry point.
"""
for entity_id in automation.automations_with_blueprint(
self.hass, blueprint_path
):
self._add_or_resolve("automation", entity_id)
def _async_search_automation_blueprint(self, blueprint_path: str) -> None:
"""Find results for an automation blueprint."""
self._add(
ItemType.AUTOMATION,
automation.automations_with_blueprint(self.hass, blueprint_path),
)
@callback
def _resolve_config_entry(self, config_entry_id: str) -> None:
"""Resolve a config entry.
Will only be called if config entry is an entry point.
"""
def _async_search_config_entry(self, config_entry_id: str) -> None:
"""Find results for a config entry."""
for device_entry in dr.async_entries_for_config_entry(
self._device_reg, config_entry_id
self._device_registry, config_entry_id
):
self._add_or_resolve("device", device_entry.id)
self._add(ItemType.DEVICE, device_entry.id)
self._async_search_device(device_entry.id, entry_point=False)
for entity_entry in er.async_entries_for_config_entry(
self._entity_reg, config_entry_id
self._entity_registry, config_entry_id
):
self._add_or_resolve("entity", entity_entry.entity_id)
self._add(ItemType.ENTITY, entity_entry.entity_id)
self._async_search_entity(entity_entry.entity_id, entry_point=False)
@callback
def _resolve_device(self, device_id: str) -> None:
"""Resolve a device."""
device_entry = self._device_reg.async_get(device_id)
# Unlikely entry doesn't exist, but let's guard for bad data.
if device_entry is not None:
if device_entry.area_id:
self._add_or_resolve("area", device_entry.area_id)
def _async_search_device(self, device_id: str, *, entry_point: bool = True) -> None:
"""Find results for a device."""
if not (device_entry := self._async_resolve_up_device(device_id)):
return
for config_entry_id in device_entry.config_entries:
self._add_or_resolve("config_entry", config_entry_id)
if entry_point:
# Add labels of this device
self._add(ItemType.LABEL, device_entry.labels)
# We do not resolve device_entry.via_device_id because that
# device is not related data-wise inside HA.
# Automations referencing this device
self._add(
ItemType.AUTOMATION,
automation.automations_with_device(self.hass, device_id),
)
for entity_entry in er.async_entries_for_device(self._entity_reg, device_id):
self._add_or_resolve("entity", entity_entry.entity_id)
# Scripts referencing this device
self._add(ItemType.SCRIPT, script.scripts_with_device(self.hass, device_id))
for entity_id in script.scripts_with_device(self.hass, device_id):
self._add_or_resolve("entity", entity_id)
for entity_id in automation.automations_with_device(self.hass, device_id):
self._add_or_resolve("entity", entity_id)
# Entities of this device
for entity_entry in er.async_entries_for_device(
self._entity_registry, device_id
):
self._add(ItemType.ENTITY, entity_entry.entity_id)
# Add all entity information as well
self._async_search_entity(entity_entry.entity_id, entry_point=False)
@callback
def _resolve_entity(self, entity_id: str) -> None:
"""Resolve an entity."""
# Extra: Find automations and scripts that reference this entity.
def _async_search_entity(self, entity_id: str, *, entry_point: bool = True) -> None:
"""Find results for an entity."""
# Resolve up the entity itself
entity_entry = self._async_resolve_up_entity(entity_id)
for entity in scene.scenes_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
if entity_entry and entry_point:
# Add labels of this entity
self._add(ItemType.LABEL, entity_entry.labels)
for entity in group.groups_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
# Automations referencing this entity
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, entity_id),
)
for entity in automation.automations_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
# Scripts referencing this entity
self._add(ItemType.SCRIPT, script.scripts_with_entity(self.hass, entity_id))
for entity in script.scripts_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
# Groups that have this entity as a member
self._add(ItemType.GROUP, group.groups_with_entity(self.hass, entity_id))
for entity in person.persons_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
# Persons referencing this entity
self._add(ItemType.PERSON, person.persons_with_entity(self.hass, entity_id))
# Find devices
entity_entry = self._entity_reg.async_get(entity_id)
if entity_entry is not None:
if entity_entry.device_id:
self._add_or_resolve("device", entity_entry.device_id)
if entity_entry.config_entry_id is not None:
self._add_or_resolve("config_entry", entity_entry.config_entry_id)
else:
source = self._sources.get(entity_id)
if source is not None and "config_entry" in source:
self._add_or_resolve("config_entry", source["config_entry"])
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add_or_resolve(domain, entity_id)
# Scenes referencing this entity
self._add(ItemType.SCENE, scene.scenes_with_entity(self.hass, entity_id))
@callback
def _resolve_group(self, group_entity_id: str) -> None:
"""Resolve a group.
def _async_search_floor(self, floor_id: str) -> None:
"""Find results for a floor."""
# Automations referencing this floor
self._add(
ItemType.AUTOMATION,
automation.automations_with_floor(self.hass, floor_id),
)
Will only be called if group is an entry point.
# Scripts referencing this floor
self._add(ItemType.SCRIPT, script.scripts_with_floor(self.hass, floor_id))
for area_entry in ar.async_entries_for_floor(self._area_registry, floor_id):
self._add(ItemType.AREA, area_entry.id)
self._async_search_area(area_entry.id, entry_point=False)
@callback
def _async_search_group(self, group_entity_id: str) -> None:
"""Find results for a group.
Note: We currently only support the classic groups, thus
we don't look up the area/floor for a group entity.
"""
# Automations referencing this group
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, group_entity_id),
)
# Scripts referencing this group
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, group_entity_id)
)
# Scenes that reference this group
self._add(ItemType.SCENE, scene.scenes_with_entity(self.hass, group_entity_id))
# Entities in this group
for entity_id in group.get_entity_ids(self.hass, group_entity_id):
self._add_or_resolve("entity", entity_id)
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
@callback
def _resolve_person(self, person_entity_id: str) -> None:
"""Resolve a person.
def _async_search_label(self, label_id: str) -> None:
"""Find results for a label."""
Will only be called if person is an entry point.
"""
for entity in person.entities_in_person(self.hass, person_entity_id):
self._add_or_resolve("entity", entity)
# Areas with this label
for area_entry in ar.async_entries_for_label(self._area_registry, label_id):
self._add(ItemType.AREA, area_entry.id)
# Devices with this label
for device in dr.async_entries_for_label(self._device_registry, label_id):
self._add(ItemType.DEVICE, device.id)
# Entities with this label
for entity_entry in er.async_entries_for_label(self._entity_registry, label_id):
self._add(ItemType.ENTITY, entity_entry.entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_entry.entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_entry.entity_id)
# Automations referencing this label
self._add(
ItemType.AUTOMATION,
automation.automations_with_label(self.hass, label_id),
)
# Scripts referencing this label
self._add(ItemType.SCRIPT, script.scripts_with_label(self.hass, label_id))
@callback
def _resolve_scene(self, scene_entity_id: str) -> None:
"""Resolve a scene.
def _async_search_person(self, person_entity_id: str) -> None:
"""Find results for a person."""
# Up resolve the scene entity itself
if entity_entry := self._async_resolve_up_entity(person_entity_id):
# Add labels of this person entity
self._add(ItemType.LABEL, entity_entry.labels)
Will only be called if scene is an entry point.
"""
# Automations referencing this person
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, person_entity_id),
)
# Scripts referencing this person
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, person_entity_id)
)
# Add all member entities of this person
self._add(
ItemType.ENTITY, person.entities_in_person(self.hass, person_entity_id)
)
@callback
def _async_search_scene(self, scene_entity_id: str) -> None:
"""Find results for a scene."""
# Up resolve the scene entity itself
if entity_entry := self._async_resolve_up_entity(scene_entity_id):
# Add labels of this scene entity
self._add(ItemType.LABEL, entity_entry.labels)
# Automations referencing this scene
self._add(
ItemType.AUTOMATION,
automation.automations_with_entity(self.hass, scene_entity_id),
)
# Scripts referencing this scene
self._add(
ItemType.SCRIPT, script.scripts_with_entity(self.hass, scene_entity_id)
)
# Add all entities in this scene
for entity in scene.entities_in_scene(self.hass, scene_entity_id):
self._add_or_resolve("entity", entity)
self._add(ItemType.ENTITY, entity)
self._async_resolve_up_entity(entity)
@callback
def _resolve_script(self, script_entity_id: str) -> None:
"""Resolve a script.
def _async_search_script(
self, script_entity_id: str, *, entry_point: bool = True
) -> None:
"""Find results for a script."""
# Up resolve the script entity itself
entity_entry = self._async_resolve_up_entity(script_entity_id)
Will only be called if script is an entry point.
"""
for entity in script.entities_in_script(self.hass, script_entity_id):
self._add_or_resolve("entity", entity)
if entity_entry and entry_point:
# Add labels of this script entity
self._add(ItemType.LABEL, entity_entry.labels)
for device in script.devices_in_script(self.hass, script_entity_id):
self._add_or_resolve("device", device)
# Find the blueprint used in this script
self._add(
ItemType.SCRIPT_BLUEPRINT,
script.blueprint_in_script(self.hass, script_entity_id),
)
# Floors referenced in this script
self._add(ItemType.FLOOR, script.floors_in_script(self.hass, script_entity_id))
# Areas referenced in this script
for area in script.areas_in_script(self.hass, script_entity_id):
self._add_or_resolve("area", area)
self._add(ItemType.AREA, area)
self._async_resolve_up_area(area)
if blueprint := script.blueprint_in_script(self.hass, script_entity_id):
self._add_or_resolve("script_blueprint", blueprint)
# Devices referenced in this script
for device in script.devices_in_script(self.hass, script_entity_id):
self._add(ItemType.DEVICE, device)
self._async_resolve_up_device(device)
# Entities referenced in this script
for entity_id in script.entities_in_script(self.hass, script_entity_id):
self._add(ItemType.ENTITY, entity_id)
self._async_resolve_up_entity(entity_id)
# If this entity also exists as a resource, we add it.
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add(ItemType(domain), entity_id)
# For an script, we want to unwrap the groups, to ensure we
# relate this script to all those members as well.
if domain == "group":
for group_entity_id in group.get_entity_ids(self.hass, entity_id):
self._add(ItemType.ENTITY, group_entity_id)
self._async_resolve_up_entity(group_entity_id)
# For an script, we want to unwrap the scenes, to ensure we
# relate this script to all referenced entities as well.
if domain == "scene":
for scene_entity_id in scene.entities_in_scene(self.hass, entity_id):
self._add(ItemType.ENTITY, scene_entity_id)
self._async_resolve_up_entity(scene_entity_id)
# Fully search the script if it is nested.
# This makes the script return all results of the embedded script.
if domain == "script":
self._async_search_script(entity_id, entry_point=False)
@callback
def _resolve_script_blueprint(self, blueprint_path: str) -> None:
"""Resolve a script blueprint.
def _async_search_script_blueprint(self, blueprint_path: str) -> None:
"""Find results for a script blueprint."""
self._add(
ItemType.SCRIPT, script.scripts_with_blueprint(self.hass, blueprint_path)
)
Will only be called if blueprint is an entry point.
@callback
def _async_resolve_up_device(self, device_id: str) -> dr.DeviceEntry | None:
"""Resolve up from a device.
Above a device is an area or floor.
Above a device is also the config entry.
"""
for entity_id in script.scripts_with_blueprint(self.hass, blueprint_path):
self._add_or_resolve("script", entity_id)
if device_entry := self._device_registry.async_get(device_id):
if device_entry.area_id:
self._add(ItemType.AREA, device_entry.area_id)
self._async_resolve_up_area(device_entry.area_id)
self._add(ItemType.CONFIG_ENTRY, device_entry.config_entries)
return device_entry
@callback
def _async_resolve_up_entity(self, entity_id: str) -> er.RegistryEntry | None:
"""Resolve up from an entity.
Above an entity is a device, area or floor.
Above an entity is also the config entry.
"""
if entity_entry := self._entity_registry.async_get(entity_id):
# Entity has an overridden area
if entity_entry.area_id:
self._add(ItemType.AREA, entity_entry.area_id)
self._async_resolve_up_area(entity_entry.area_id)
# Inherit area from device
elif entity_entry.device_id and (
device_entry := self._device_registry.async_get(entity_entry.device_id)
):
if device_entry.area_id:
self._add(ItemType.AREA, device_entry.area_id)
self._async_resolve_up_area(device_entry.area_id)
# Add device that provided this entity
self._add(ItemType.DEVICE, entity_entry.device_id)
# Add config entry that provided this entity
self._add(ItemType.CONFIG_ENTRY, entity_entry.config_entry_id)
elif source := self._entity_sources.get(entity_id):
# Add config entry that provided this entity
self._add(ItemType.CONFIG_ENTRY, source.get("config_entry"))
return entity_entry
@callback
def _async_resolve_up_area(self, area_id: str) -> ar.AreaEntry | None:
"""Resolve up from an area.
Above an area can be a floor.
"""
if area_entry := self._area_registry.async_get_area(area_id):
self._add(ItemType.FLOOR, area_entry.floor_id)
return area_entry

File diff suppressed because it is too large Load Diff