mirror of
https://github.com/home-assistant/core.git
synced 2025-07-19 11:17:21 +00:00
Improve dependencies resolution (#138502)
* Improve dependencies resolution * Improve tests * Better docstrings * Fix comment * Improve tests * Improve logging * Address feedback * Address feedback * Address feedback * Address feedback * Address feedback * Simplify error handling * small log change * Add comment * Address feedback * shorter comments * Add test
This commit is contained in:
parent
52408e67b2
commit
4f25296c50
@ -93,6 +93,7 @@ from .helpers.dispatcher import async_dispatcher_send_internal
|
||||
from .helpers.storage import get_internal_store_manager
|
||||
from .helpers.system_info import async_get_system_info
|
||||
from .helpers.typing import ConfigType
|
||||
from .loader import Integration
|
||||
from .setup import (
|
||||
# _setup_started is marked as protected to make it clear
|
||||
# that it is not part of the public API and should not be used
|
||||
@ -711,20 +712,25 @@ def _get_domains(hass: core.HomeAssistant, config: dict[str, Any]) -> set[str]:
|
||||
return domains
|
||||
|
||||
|
||||
async def _async_resolve_domains_to_setup(
|
||||
async def _async_resolve_domains_and_preload(
|
||||
hass: core.HomeAssistant, config: dict[str, Any]
|
||||
) -> tuple[set[str], dict[str, loader.Integration]]:
|
||||
"""Resolve all dependencies and return list of domains to set up."""
|
||||
) -> tuple[dict[str, Integration], dict[str, Integration]]:
|
||||
"""Resolve all dependencies and return integrations to set up.
|
||||
|
||||
The return value is a tuple of two dictionaries:
|
||||
- The first dictionary contains integrations
|
||||
specified by the configuration (including config entries).
|
||||
- The second dictionary contains the same integrations as the first dictionary
|
||||
together with all their dependencies.
|
||||
"""
|
||||
domains_to_setup = _get_domains(hass, config)
|
||||
needed_requirements: set[str] = set()
|
||||
platform_integrations = conf_util.extract_platform_integrations(
|
||||
config, BASE_PLATFORMS
|
||||
)
|
||||
# Ensure base platforms that have platform integrations are added to
|
||||
# to `domains_to_setup so they can be setup first instead of
|
||||
# discovering them when later when a config entry setup task
|
||||
# notices its needed and there is already a long line to use
|
||||
# the import executor.
|
||||
# Ensure base platforms that have platform integrations are added to `domains`,
|
||||
# so they can be setup first instead of discovering them later when a config
|
||||
# entry setup task notices that it's needed and there is already a long line
|
||||
# to use the import executor.
|
||||
#
|
||||
# For example if we have
|
||||
# sensor:
|
||||
@ -740,111 +746,78 @@ async def _async_resolve_domains_to_setup(
|
||||
# so this will be less of a problem in the future.
|
||||
domains_to_setup.update(platform_integrations)
|
||||
|
||||
# Load manifests for base platforms and platform based integrations
|
||||
# that are defined under base platforms right away since we do not require
|
||||
# the manifest to list them as dependencies and we want to avoid the lock
|
||||
# contention when multiple integrations try to load them at once
|
||||
additional_manifests_to_load = {
|
||||
# Additionally process base platforms since we do not require the manifest
|
||||
# to list them as dependencies.
|
||||
# We want to later avoid lock contention when multiple integrations try to load
|
||||
# their manifests at once.
|
||||
# Also process integrations that are defined under base platforms
|
||||
# to speed things up.
|
||||
additional_domains_to_process = {
|
||||
*BASE_PLATFORMS,
|
||||
*chain.from_iterable(platform_integrations.values()),
|
||||
}
|
||||
|
||||
translations_to_load = additional_manifests_to_load.copy()
|
||||
|
||||
# Resolve all dependencies so we know all integrations
|
||||
# that will have to be loaded and start right-away
|
||||
integration_cache: dict[str, loader.Integration] = {}
|
||||
to_resolve: set[str] = domains_to_setup
|
||||
while to_resolve or additional_manifests_to_load:
|
||||
old_to_resolve: set[str] = to_resolve
|
||||
to_resolve = set()
|
||||
integrations_or_excs = await loader.async_get_integrations(
|
||||
hass, {*domains_to_setup, *additional_domains_to_process}
|
||||
)
|
||||
# Eliminate those missing or with invalid manifest
|
||||
integrations_to_process = {
|
||||
domain: itg
|
||||
for domain, itg in integrations_or_excs.items()
|
||||
if isinstance(itg, Integration)
|
||||
}
|
||||
integrations_dependencies = await loader.resolve_integrations_dependencies(
|
||||
hass, integrations_to_process.values()
|
||||
)
|
||||
# Eliminate those without valid dependencies
|
||||
integrations_to_process = {
|
||||
domain: integrations_to_process[domain] for domain in integrations_dependencies
|
||||
}
|
||||
|
||||
if additional_manifests_to_load:
|
||||
to_get = {*old_to_resolve, *additional_manifests_to_load}
|
||||
additional_manifests_to_load.clear()
|
||||
else:
|
||||
to_get = old_to_resolve
|
||||
integrations_to_setup = {
|
||||
domain: itg
|
||||
for domain, itg in integrations_to_process.items()
|
||||
if domain in domains_to_setup
|
||||
}
|
||||
all_integrations_to_setup = integrations_to_setup.copy()
|
||||
all_integrations_to_setup.update(
|
||||
(dep, loader.async_get_loaded_integration(hass, dep))
|
||||
for domain in integrations_to_setup
|
||||
for dep in integrations_dependencies[domain].difference(
|
||||
all_integrations_to_setup
|
||||
)
|
||||
)
|
||||
|
||||
manifest_deps: set[str] = set()
|
||||
resolve_dependencies_tasks: list[asyncio.Task[bool]] = []
|
||||
integrations_to_process: list[loader.Integration] = []
|
||||
|
||||
for domain, itg in (await loader.async_get_integrations(hass, to_get)).items():
|
||||
if not isinstance(itg, loader.Integration):
|
||||
continue
|
||||
integration_cache[domain] = itg
|
||||
needed_requirements.update(itg.requirements)
|
||||
|
||||
# Make sure manifests for dependencies are loaded in the next
|
||||
# loop to try to group as many as manifest loads in a single
|
||||
# call to avoid the creating one-off executor jobs later in
|
||||
# the setup process
|
||||
additional_manifests_to_load.update(
|
||||
dep
|
||||
for dep in chain(itg.dependencies, itg.after_dependencies)
|
||||
if dep not in integration_cache
|
||||
)
|
||||
|
||||
if domain not in old_to_resolve:
|
||||
continue
|
||||
|
||||
integrations_to_process.append(itg)
|
||||
manifest_deps.update(itg.dependencies)
|
||||
manifest_deps.update(itg.after_dependencies)
|
||||
if not itg.all_dependencies_resolved:
|
||||
resolve_dependencies_tasks.append(
|
||||
create_eager_task(
|
||||
itg.resolve_dependencies(),
|
||||
name=f"resolve dependencies {domain}",
|
||||
loop=hass.loop,
|
||||
)
|
||||
)
|
||||
|
||||
if unseen_deps := manifest_deps - integration_cache.keys():
|
||||
# If there are dependencies, try to preload all
|
||||
# the integrations manifest at once and add them
|
||||
# to the list of requirements we need to install
|
||||
# so we can try to check if they are already installed
|
||||
# in a single call below which avoids each integration
|
||||
# having to wait for the lock to do it individually
|
||||
deps = await loader.async_get_integrations(hass, unseen_deps)
|
||||
for dependant_domain, dependant_itg in deps.items():
|
||||
if isinstance(dependant_itg, loader.Integration):
|
||||
integration_cache[dependant_domain] = dependant_itg
|
||||
needed_requirements.update(dependant_itg.requirements)
|
||||
|
||||
if resolve_dependencies_tasks:
|
||||
await asyncio.gather(*resolve_dependencies_tasks)
|
||||
|
||||
for itg in integrations_to_process:
|
||||
try:
|
||||
all_deps = itg.all_dependencies
|
||||
except RuntimeError:
|
||||
# Integration.all_dependencies raises RuntimeError if
|
||||
# dependencies could not be resolved
|
||||
continue
|
||||
for dep in all_deps:
|
||||
if dep in domains_to_setup:
|
||||
continue
|
||||
domains_to_setup.add(dep)
|
||||
to_resolve.add(dep)
|
||||
|
||||
_LOGGER.info("Domains to be set up: %s", domains_to_setup)
|
||||
# Gather requirements for all integrations,
|
||||
# their dependencies and after dependencies.
|
||||
# To gather all the requirements we must ignore exceptions here.
|
||||
# The exceptions will be detected and handled later in the bootstrap process.
|
||||
integrations_after_dependencies = (
|
||||
await loader.resolve_integrations_after_dependencies(
|
||||
hass, integrations_to_process.values(), ignore_exceptions=True
|
||||
)
|
||||
)
|
||||
integrations_requirements = {
|
||||
domain: itg.requirements for domain, itg in integrations_to_process.items()
|
||||
}
|
||||
integrations_requirements.update(
|
||||
(dep, loader.async_get_loaded_integration(hass, dep).requirements)
|
||||
for deps in integrations_after_dependencies.values()
|
||||
for dep in deps.difference(integrations_requirements)
|
||||
)
|
||||
all_requirements = set(chain.from_iterable(integrations_requirements.values()))
|
||||
|
||||
# Optimistically check if requirements are already installed
|
||||
# ahead of setting up the integrations so we can prime the cache
|
||||
# We do not wait for this since its an optimization only
|
||||
# We do not wait for this since it's an optimization only
|
||||
hass.async_create_background_task(
|
||||
requirements.async_load_installed_versions(hass, needed_requirements),
|
||||
requirements.async_load_installed_versions(hass, all_requirements),
|
||||
"check installed requirements",
|
||||
eager_start=True,
|
||||
)
|
||||
|
||||
#
|
||||
# Only add the domains_to_setup after we finish resolving
|
||||
# as new domains are likely to added in the process
|
||||
#
|
||||
translations_to_load.update(domains_to_setup)
|
||||
# Start loading translations for all integrations we are going to set up
|
||||
# in the background so they are ready when we need them. This avoids a
|
||||
# lot of waiting for the translation load lock and a thundering herd of
|
||||
@ -855,6 +828,7 @@ async def _async_resolve_domains_to_setup(
|
||||
# hold the translation load lock and if anything is fast enough to
|
||||
# wait for the translation load lock, loading will be done by the
|
||||
# time it gets to it.
|
||||
translations_to_load = {*all_integrations_to_setup, *additional_domains_to_process}
|
||||
hass.async_create_background_task(
|
||||
translation.async_load_integrations(hass, translations_to_load),
|
||||
"load translations",
|
||||
@ -866,13 +840,13 @@ async def _async_resolve_domains_to_setup(
|
||||
# in the setup process.
|
||||
hass.async_create_background_task(
|
||||
get_internal_store_manager(hass).async_preload(
|
||||
[*PRELOAD_STORAGE, *domains_to_setup]
|
||||
[*PRELOAD_STORAGE, *all_integrations_to_setup]
|
||||
),
|
||||
"preload storage",
|
||||
eager_start=True,
|
||||
)
|
||||
|
||||
return domains_to_setup, integration_cache
|
||||
return integrations_to_setup, all_integrations_to_setup
|
||||
|
||||
|
||||
async def _async_set_up_integrations(
|
||||
@ -882,69 +856,90 @@ async def _async_set_up_integrations(
|
||||
watcher = _WatchPendingSetups(hass, _setup_started(hass))
|
||||
watcher.async_start()
|
||||
|
||||
domains_to_setup, integration_cache = await _async_resolve_domains_to_setup(
|
||||
integrations, all_integrations = await _async_resolve_domains_and_preload(
|
||||
hass, config
|
||||
)
|
||||
stage_2_domains = domains_to_setup.copy()
|
||||
all_domains = set(all_integrations)
|
||||
domains = set(integrations)
|
||||
|
||||
_LOGGER.info(
|
||||
"Domains to be set up: %s | %s",
|
||||
domains,
|
||||
all_domains - domains,
|
||||
)
|
||||
|
||||
# Initialize recorder
|
||||
if "recorder" in domains_to_setup:
|
||||
if "recorder" in all_domains:
|
||||
recorder.async_initialize_recorder(hass)
|
||||
|
||||
# Initialize backup
|
||||
if "backup" in domains_to_setup:
|
||||
if "backup" in all_domains:
|
||||
backup.async_initialize_backup(hass)
|
||||
|
||||
stage_0_and_1_domains: list[tuple[str, set[str], int | None]] = [
|
||||
stages: list[tuple[str, set[str], int | None]] = [
|
||||
*(
|
||||
(name, domain_group & domains_to_setup, timeout)
|
||||
(name, domain_group, timeout)
|
||||
for name, domain_group, timeout in STAGE_0_INTEGRATIONS
|
||||
),
|
||||
("stage 1", STAGE_1_INTEGRATIONS & domains_to_setup, STAGE_1_TIMEOUT),
|
||||
("1", STAGE_1_INTEGRATIONS, STAGE_1_TIMEOUT),
|
||||
("2", domains, STAGE_2_TIMEOUT),
|
||||
]
|
||||
|
||||
_LOGGER.info("Setting up stage 0 and 1")
|
||||
for name, domain_group, timeout in stage_0_and_1_domains:
|
||||
if not domain_group:
|
||||
_LOGGER.info("Setting up stage 0")
|
||||
for name, domain_group, timeout in stages:
|
||||
stage_domains_unfiltered = domain_group & all_domains
|
||||
if not stage_domains_unfiltered:
|
||||
_LOGGER.info("Nothing to set up in stage %s: %s", name, domain_group)
|
||||
continue
|
||||
|
||||
_LOGGER.info("Setting up %s: %s", name, domain_group)
|
||||
to_be_loaded = domain_group.copy()
|
||||
to_be_loaded.update(
|
||||
stage_domains = stage_domains_unfiltered - hass.config.components
|
||||
if not stage_domains:
|
||||
_LOGGER.info("Already set up stage %s: %s", name, stage_domains_unfiltered)
|
||||
continue
|
||||
|
||||
stage_dep_domains_unfiltered = {
|
||||
dep
|
||||
for domain in domain_group
|
||||
if (integration := integration_cache.get(domain)) is not None
|
||||
for dep in integration.all_dependencies
|
||||
for domain in stage_domains
|
||||
for dep in all_integrations[domain].all_dependencies
|
||||
if dep not in stage_domains
|
||||
}
|
||||
stage_dep_domains = stage_dep_domains_unfiltered - hass.config.components
|
||||
|
||||
stage_all_domains = stage_domains | stage_dep_domains
|
||||
stage_all_integrations = {
|
||||
domain: all_integrations[domain] for domain in stage_all_domains
|
||||
}
|
||||
# Detect all cycles
|
||||
stage_integrations_after_dependencies = (
|
||||
await loader.resolve_integrations_after_dependencies(
|
||||
hass, stage_all_integrations.values(), stage_all_domains
|
||||
)
|
||||
)
|
||||
async_set_domains_to_be_loaded(hass, to_be_loaded)
|
||||
stage_2_domains -= to_be_loaded
|
||||
stage_all_domains = set(stage_integrations_after_dependencies)
|
||||
stage_domains &= stage_all_domains
|
||||
stage_dep_domains &= stage_all_domains
|
||||
|
||||
_LOGGER.info(
|
||||
"Setting up stage %s: %s | %s\nDependencies: %s | %s",
|
||||
name,
|
||||
stage_domains,
|
||||
stage_domains_unfiltered - stage_domains,
|
||||
stage_dep_domains,
|
||||
stage_dep_domains_unfiltered - stage_dep_domains,
|
||||
)
|
||||
|
||||
async_set_domains_to_be_loaded(hass, stage_all_domains)
|
||||
|
||||
if timeout is None:
|
||||
await _async_setup_multi_components(hass, domain_group, config)
|
||||
else:
|
||||
try:
|
||||
async with hass.timeout.async_timeout(timeout, cool_down=COOLDOWN_TIME):
|
||||
await _async_setup_multi_components(hass, domain_group, config)
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Setup timed out for %s waiting on %s - moving forward",
|
||||
name,
|
||||
hass._active_tasks, # noqa: SLF001
|
||||
)
|
||||
|
||||
# Add after dependencies when setting up stage 2 domains
|
||||
async_set_domains_to_be_loaded(hass, stage_2_domains)
|
||||
|
||||
if stage_2_domains:
|
||||
_LOGGER.info("Setting up stage 2: %s", stage_2_domains)
|
||||
await _async_setup_multi_components(hass, stage_all_domains, config)
|
||||
continue
|
||||
try:
|
||||
async with hass.timeout.async_timeout(
|
||||
STAGE_2_TIMEOUT, cool_down=COOLDOWN_TIME
|
||||
):
|
||||
await _async_setup_multi_components(hass, stage_2_domains, config)
|
||||
async with hass.timeout.async_timeout(timeout, cool_down=COOLDOWN_TIME):
|
||||
await _async_setup_multi_components(hass, stage_all_domains, config)
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Setup timed out for stage 2 waiting on %s - moving forward",
|
||||
"Setup timed out for stage %s waiting on %s - moving forward",
|
||||
name,
|
||||
hass._active_tasks, # noqa: SLF001
|
||||
)
|
||||
|
||||
@ -1046,8 +1041,6 @@ async def _async_setup_multi_components(
|
||||
config: dict[str, Any],
|
||||
) -> None:
|
||||
"""Set up multiple domains. Log on failure."""
|
||||
# Avoid creating tasks for domains that were setup in a previous stage
|
||||
domains_not_yet_setup = domains - hass.config.components
|
||||
# Create setup tasks for base platforms first since everything will have
|
||||
# to wait to be imported, and the sooner we can get the base platforms
|
||||
# loaded the sooner we can start loading the rest of the integrations.
|
||||
@ -1057,9 +1050,7 @@ async def _async_setup_multi_components(
|
||||
f"setup component {domain}",
|
||||
eager_start=True,
|
||||
)
|
||||
for domain in sorted(
|
||||
domains_not_yet_setup, key=SETUP_ORDER_SORT_KEY, reverse=True
|
||||
)
|
||||
for domain in sorted(domains, key=SETUP_ORDER_SORT_KEY, reverse=True)
|
||||
}
|
||||
results = await asyncio.gather(*futures.values(), return_exceptions=True)
|
||||
for idx, domain in enumerate(futures):
|
||||
|
@ -40,6 +40,8 @@ from .generated.ssdp import SSDP
|
||||
from .generated.usb import USB
|
||||
from .generated.zeroconf import HOMEKIT, ZEROCONF
|
||||
from .helpers.json import json_bytes, json_fragment
|
||||
from .helpers.typing import UNDEFINED, UndefinedType
|
||||
from .util.async_ import create_eager_task
|
||||
from .util.hass_dict import HassKey
|
||||
from .util.json import JSON_DECODE_EXCEPTIONS, json_loads
|
||||
|
||||
@ -758,10 +760,8 @@ class Integration:
|
||||
manifest["overwrites_built_in"] = self.overwrites_built_in
|
||||
|
||||
if self.dependencies:
|
||||
self._all_dependencies_resolved: bool | None = None
|
||||
self._all_dependencies: set[str] | None = None
|
||||
self._all_dependencies: set[str] | Exception | None = None
|
||||
else:
|
||||
self._all_dependencies_resolved = True
|
||||
self._all_dependencies = set()
|
||||
|
||||
self._platforms_to_preload = hass.data[DATA_PRELOAD_PLATFORMS]
|
||||
@ -933,47 +933,25 @@ class Integration:
|
||||
"""Return all dependencies including sub-dependencies."""
|
||||
if self._all_dependencies is None:
|
||||
raise RuntimeError("Dependencies not resolved!")
|
||||
if isinstance(self._all_dependencies, Exception):
|
||||
raise self._all_dependencies
|
||||
|
||||
return self._all_dependencies
|
||||
|
||||
@property
|
||||
def all_dependencies_resolved(self) -> bool:
|
||||
"""Return if all dependencies have been resolved."""
|
||||
return self._all_dependencies_resolved is not None
|
||||
return self._all_dependencies is not None
|
||||
|
||||
async def resolve_dependencies(self) -> bool:
|
||||
async def resolve_dependencies(self) -> set[str] | None:
|
||||
"""Resolve all dependencies."""
|
||||
if self._all_dependencies_resolved is not None:
|
||||
return self._all_dependencies_resolved
|
||||
if self._all_dependencies is not None:
|
||||
if isinstance(self._all_dependencies, Exception):
|
||||
return None
|
||||
return self._all_dependencies
|
||||
|
||||
self._all_dependencies_resolved = False
|
||||
try:
|
||||
dependencies = await _async_component_dependencies(self.hass, self)
|
||||
except IntegrationNotFound as err:
|
||||
_LOGGER.error(
|
||||
(
|
||||
"Unable to resolve dependencies for %s: unable to resolve"
|
||||
" (sub)dependency %s"
|
||||
),
|
||||
self.domain,
|
||||
err.domain,
|
||||
)
|
||||
except CircularDependency as err:
|
||||
_LOGGER.error(
|
||||
(
|
||||
"Unable to resolve dependencies for %s: it contains a circular"
|
||||
" dependency: %s -> %s"
|
||||
),
|
||||
self.domain,
|
||||
err.from_domain,
|
||||
err.to_domain,
|
||||
)
|
||||
else:
|
||||
dependencies.discard(self.domain)
|
||||
self._all_dependencies = dependencies
|
||||
self._all_dependencies_resolved = True
|
||||
|
||||
return self._all_dependencies_resolved
|
||||
result = await resolve_integrations_dependencies(self.hass, (self,))
|
||||
return result.get(self.domain)
|
||||
|
||||
async def async_get_component(self) -> ComponentProtocol:
|
||||
"""Return the component.
|
||||
@ -1441,6 +1419,189 @@ async def async_get_integrations(
|
||||
return results
|
||||
|
||||
|
||||
class _ResolveDependenciesCacheProtocol(Protocol):
|
||||
def get(self, itg: Integration) -> set[str] | Exception | None: ...
|
||||
|
||||
def __setitem__(
|
||||
self, itg: Integration, all_dependencies: set[str] | Exception
|
||||
) -> None: ...
|
||||
|
||||
|
||||
class _ResolveDependenciesCache(_ResolveDependenciesCacheProtocol):
|
||||
"""Cache for resolve_integrations_dependencies."""
|
||||
|
||||
def get(self, itg: Integration) -> set[str] | Exception | None:
|
||||
return itg._all_dependencies # noqa: SLF001
|
||||
|
||||
def __setitem__(
|
||||
self, itg: Integration, all_dependencies: set[str] | Exception
|
||||
) -> None:
|
||||
itg._all_dependencies = all_dependencies # noqa: SLF001
|
||||
|
||||
|
||||
async def resolve_integrations_dependencies(
|
||||
hass: HomeAssistant, integrations: Iterable[Integration]
|
||||
) -> dict[str, set[str]]:
|
||||
"""Resolve all dependencies for integrations.
|
||||
|
||||
Detects circular dependencies and missing integrations.
|
||||
"""
|
||||
resolved = _ResolveDependenciesCache()
|
||||
|
||||
async def _resolve_deps_catch_exceptions(itg: Integration) -> set[str] | None:
|
||||
try:
|
||||
return await _do_resolve_dependencies(itg, cache=resolved)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_LOGGER.error("Unable to resolve dependencies for %s: %s", itg.domain, exc)
|
||||
return None
|
||||
|
||||
resolve_dependencies_tasks = {
|
||||
itg.domain: create_eager_task(
|
||||
_resolve_deps_catch_exceptions(itg),
|
||||
name=f"resolve dependencies {itg.domain}",
|
||||
loop=hass.loop,
|
||||
)
|
||||
for itg in integrations
|
||||
}
|
||||
|
||||
result = await asyncio.gather(*resolve_dependencies_tasks.values())
|
||||
|
||||
return {
|
||||
domain: deps
|
||||
for domain, deps in zip(resolve_dependencies_tasks, result, strict=True)
|
||||
if deps is not None
|
||||
}
|
||||
|
||||
|
||||
async def resolve_integrations_after_dependencies(
|
||||
hass: HomeAssistant,
|
||||
integrations: Iterable[Integration],
|
||||
possible_after_dependencies: set[str] | None = None,
|
||||
*,
|
||||
ignore_exceptions: bool = False,
|
||||
) -> dict[str, set[str]]:
|
||||
"""Resolve all dependencies, including after_dependencies, for integrations.
|
||||
|
||||
Detects circular dependencies and missing integrations.
|
||||
"""
|
||||
resolved: dict[Integration, set[str] | Exception] = {}
|
||||
|
||||
async def _resolve_deps_catch_exceptions(itg: Integration) -> set[str] | None:
|
||||
try:
|
||||
return await _do_resolve_dependencies(
|
||||
itg,
|
||||
cache=resolved,
|
||||
possible_after_dependencies=possible_after_dependencies,
|
||||
ignore_exceptions=ignore_exceptions,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
_LOGGER.error(
|
||||
"Unable to resolve (after) dependencies for %s: %s", itg.domain, exc
|
||||
)
|
||||
return None
|
||||
|
||||
resolve_dependencies_tasks = {
|
||||
itg.domain: create_eager_task(
|
||||
_resolve_deps_catch_exceptions(itg),
|
||||
name=f"resolve after dependencies {itg.domain}",
|
||||
loop=hass.loop,
|
||||
)
|
||||
for itg in integrations
|
||||
}
|
||||
|
||||
result = await asyncio.gather(*resolve_dependencies_tasks.values())
|
||||
|
||||
return {
|
||||
domain: deps
|
||||
for domain, deps in zip(resolve_dependencies_tasks, result, strict=True)
|
||||
if deps is not None
|
||||
}
|
||||
|
||||
|
||||
async def _do_resolve_dependencies(
|
||||
itg: Integration,
|
||||
*,
|
||||
cache: _ResolveDependenciesCacheProtocol,
|
||||
possible_after_dependencies: set[str] | None | UndefinedType = UNDEFINED,
|
||||
ignore_exceptions: bool = False,
|
||||
) -> set[str]:
|
||||
"""Recursively resolve all dependencies.
|
||||
|
||||
Uses `cache` to cache the results.
|
||||
|
||||
If `possible_after_dependencies` is not UNDEFINED,
|
||||
listed after dependencies are also considered.
|
||||
If `possible_after_dependencies` is None,
|
||||
all the possible after dependencies are considered.
|
||||
|
||||
If `ignore_exceptions` is True, exceptions are caught and ignored
|
||||
and the normal resolution algorithm continues.
|
||||
Otherwise, exceptions are raised.
|
||||
"""
|
||||
resolved = cache
|
||||
resolving: set[str] = set()
|
||||
|
||||
async def do_resolve_dependencies_impl(itg: Integration) -> set[str]:
|
||||
domain = itg.domain
|
||||
|
||||
# If it's already resolved, no point doing it again.
|
||||
if (result := resolved.get(itg)) is not None:
|
||||
if isinstance(result, Exception):
|
||||
raise result
|
||||
return result
|
||||
|
||||
# If we are already resolving it, we have a circular dependency.
|
||||
if domain in resolving:
|
||||
if ignore_exceptions:
|
||||
resolved[itg] = set()
|
||||
return set()
|
||||
exc = CircularDependency([domain])
|
||||
resolved[itg] = exc
|
||||
raise exc
|
||||
|
||||
resolving.add(domain)
|
||||
|
||||
dependencies_domains = set(itg.dependencies)
|
||||
if possible_after_dependencies is not UNDEFINED:
|
||||
if possible_after_dependencies is None:
|
||||
after_dependencies: Iterable[str] = itg.after_dependencies
|
||||
else:
|
||||
after_dependencies = (
|
||||
set(itg.after_dependencies) & possible_after_dependencies
|
||||
)
|
||||
dependencies_domains.update(after_dependencies)
|
||||
dependencies = await async_get_integrations(itg.hass, dependencies_domains)
|
||||
|
||||
all_dependencies: set[str] = set()
|
||||
for dep_domain, dep_integration in dependencies.items():
|
||||
if isinstance(dep_integration, Exception):
|
||||
if ignore_exceptions:
|
||||
continue
|
||||
resolved[itg] = dep_integration
|
||||
raise dep_integration
|
||||
|
||||
all_dependencies.add(dep_domain)
|
||||
|
||||
try:
|
||||
dep_dependencies = await do_resolve_dependencies_impl(dep_integration)
|
||||
except CircularDependency as exc:
|
||||
exc.extend_cycle(domain)
|
||||
resolved[itg] = exc
|
||||
raise
|
||||
except Exception as exc:
|
||||
resolved[itg] = exc
|
||||
raise
|
||||
|
||||
all_dependencies.update(dep_dependencies)
|
||||
|
||||
resolving.remove(domain)
|
||||
|
||||
resolved[itg] = all_dependencies
|
||||
return all_dependencies
|
||||
|
||||
return await do_resolve_dependencies_impl(itg)
|
||||
|
||||
|
||||
class LoaderError(Exception):
|
||||
"""Loader base error."""
|
||||
|
||||
@ -1466,11 +1627,13 @@ class IntegrationNotLoaded(LoaderError):
|
||||
class CircularDependency(LoaderError):
|
||||
"""Raised when a circular dependency is found when resolving components."""
|
||||
|
||||
def __init__(self, from_domain: str | set[str], to_domain: str) -> None:
|
||||
def __init__(self, domain_cycle: list[str]) -> None:
|
||||
"""Initialize circular dependency error."""
|
||||
super().__init__(f"Circular dependency detected: {from_domain} -> {to_domain}.")
|
||||
self.from_domain = from_domain
|
||||
self.to_domain = to_domain
|
||||
super().__init__("Circular dependency detected", domain_cycle)
|
||||
|
||||
def extend_cycle(self, domain: str) -> None:
|
||||
"""Extend the cycle with the domain."""
|
||||
self.args[1].insert(0, domain)
|
||||
|
||||
|
||||
def _load_file(
|
||||
@ -1624,50 +1787,6 @@ def bind_hass[_CallableT: Callable[..., Any]](func: _CallableT) -> _CallableT:
|
||||
return func
|
||||
|
||||
|
||||
async def _async_component_dependencies(
|
||||
hass: HomeAssistant,
|
||||
integration: Integration,
|
||||
) -> set[str]:
|
||||
"""Get component dependencies."""
|
||||
loading: set[str] = set()
|
||||
loaded: set[str] = set()
|
||||
|
||||
async def component_dependencies_impl(integration: Integration) -> None:
|
||||
"""Recursively get component dependencies."""
|
||||
domain = integration.domain
|
||||
if not (dependencies := integration.dependencies):
|
||||
loaded.add(domain)
|
||||
return
|
||||
|
||||
loading.add(domain)
|
||||
dep_integrations = await async_get_integrations(hass, dependencies)
|
||||
for dependency_domain, dep_integration in dep_integrations.items():
|
||||
if isinstance(dep_integration, Exception):
|
||||
raise dep_integration
|
||||
|
||||
# If we are already loading it, we have a circular dependency.
|
||||
# We have to check it here to make sure that every integration that
|
||||
# depends on us, does not appear in our own after_dependencies.
|
||||
if conflict := loading.intersection(dep_integration.after_dependencies):
|
||||
raise CircularDependency(conflict, dependency_domain)
|
||||
|
||||
# If we have already loaded it, no point doing it again.
|
||||
if dependency_domain in loaded:
|
||||
continue
|
||||
|
||||
# If we are already loading it, we have a circular dependency.
|
||||
if dependency_domain in loading:
|
||||
raise CircularDependency(dependency_domain, domain)
|
||||
|
||||
await component_dependencies_impl(dep_integration)
|
||||
loading.remove(domain)
|
||||
loaded.add(domain)
|
||||
|
||||
await component_dependencies_impl(integration)
|
||||
|
||||
return loaded
|
||||
|
||||
|
||||
def _async_mount_config_dir(hass: HomeAssistant) -> None:
|
||||
"""Mount config dir in order to load custom_component.
|
||||
|
||||
|
@ -323,7 +323,7 @@ async def _async_setup_component(
|
||||
translation.async_load_integrations(hass, integration_set), loop=hass.loop
|
||||
)
|
||||
# Validate all dependencies exist and there are no circular dependencies
|
||||
if not await integration.resolve_dependencies():
|
||||
if await integration.resolve_dependencies() is None:
|
||||
return False
|
||||
|
||||
# Process requirements as soon as possible, so we can import the component
|
||||
|
@ -572,7 +572,7 @@ async def test_setup_after_deps_not_present(hass: HomeAssistant) -> None:
|
||||
MockModule(
|
||||
domain="second_dep",
|
||||
async_setup=gen_domain_setup("second_dep"),
|
||||
partial_manifest={"after_dependencies": ["first_dep"]},
|
||||
partial_manifest={"after_dependencies": ["first_dep", "root"]},
|
||||
),
|
||||
)
|
||||
|
||||
@ -1169,6 +1169,7 @@ async def test_bootstrap_is_cancellation_safe(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test cancellation during async_setup_component does not cancel bootstrap."""
|
||||
mock_integration(hass, MockModule(domain="cancel_integration"))
|
||||
with patch.object(
|
||||
bootstrap, "async_setup_component", side_effect=asyncio.CancelledError
|
||||
):
|
||||
@ -1185,6 +1186,18 @@ async def test_bootstrap_empty_integrations(hass: HomeAssistant) -> None:
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("load_registries", [False])
|
||||
async def test_bootstrap_log_already_setup_stage(
|
||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||
) -> None:
|
||||
"""Test logging when all integrations in a stage were already setup."""
|
||||
with patch.object(bootstrap, "STAGE_1_INTEGRATIONS", {"frontend"}):
|
||||
await bootstrap._async_set_up_integrations(hass, {})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert "Already set up stage 1: {'frontend'}" in caplog.text
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_mqtt_config_flow")
|
||||
def mock_mqtt_config_flow_fixture() -> Generator[None]:
|
||||
"""Mock MQTT config flow."""
|
||||
|
@ -27,33 +27,42 @@ async def test_circular_component_dependencies(hass: HomeAssistant) -> None:
|
||||
mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
|
||||
mock_integration(hass, MockModule("mod3", dependencies=["mod1"]))
|
||||
mod_4 = mock_integration(hass, MockModule("mod4", dependencies=["mod2", "mod3"]))
|
||||
all_domains = {"mod1", "mod2", "mod3", "mod4"}
|
||||
|
||||
deps = await loader._async_component_dependencies(hass, mod_4)
|
||||
assert deps == {"mod1", "mod2", "mod3", "mod4"}
|
||||
deps = await loader._do_resolve_dependencies(mod_4, cache={})
|
||||
assert deps == {"mod1", "mod2", "mod3"}
|
||||
|
||||
# Create a circular dependency
|
||||
mock_integration(hass, MockModule("mod1", dependencies=["mod4"]))
|
||||
with pytest.raises(loader.CircularDependency):
|
||||
await loader._async_component_dependencies(hass, mod_4)
|
||||
await loader._do_resolve_dependencies(mod_4, cache={})
|
||||
|
||||
# Create a different circular dependency
|
||||
mock_integration(hass, MockModule("mod1", dependencies=["mod3"]))
|
||||
with pytest.raises(loader.CircularDependency):
|
||||
await loader._async_component_dependencies(hass, mod_4)
|
||||
await loader._do_resolve_dependencies(mod_4, cache={})
|
||||
|
||||
# Create a circular after_dependency
|
||||
mock_integration(
|
||||
hass, MockModule("mod1", partial_manifest={"after_dependencies": ["mod4"]})
|
||||
)
|
||||
with pytest.raises(loader.CircularDependency):
|
||||
await loader._async_component_dependencies(hass, mod_4)
|
||||
await loader._do_resolve_dependencies(
|
||||
mod_4,
|
||||
cache={},
|
||||
possible_after_dependencies=all_domains,
|
||||
)
|
||||
|
||||
# Create a different circular after_dependency
|
||||
mock_integration(
|
||||
hass, MockModule("mod1", partial_manifest={"after_dependencies": ["mod3"]})
|
||||
)
|
||||
with pytest.raises(loader.CircularDependency):
|
||||
await loader._async_component_dependencies(hass, mod_4)
|
||||
await loader._do_resolve_dependencies(
|
||||
mod_4,
|
||||
cache={},
|
||||
possible_after_dependencies=all_domains,
|
||||
)
|
||||
|
||||
# Create a circular after_dependency without a hard dependency
|
||||
mock_integration(
|
||||
@ -62,29 +71,48 @@ async def test_circular_component_dependencies(hass: HomeAssistant) -> None:
|
||||
mod_4 = mock_integration(
|
||||
hass, MockModule("mod4", partial_manifest={"after_dependencies": ["mod2"]})
|
||||
)
|
||||
# this currently doesn't raise, but it should. Will be improved in a follow-up.
|
||||
await loader._async_component_dependencies(hass, mod_4)
|
||||
with pytest.raises(loader.CircularDependency):
|
||||
await loader._do_resolve_dependencies(
|
||||
mod_4,
|
||||
cache={},
|
||||
possible_after_dependencies=all_domains,
|
||||
)
|
||||
|
||||
result = await loader.resolve_integrations_after_dependencies(hass, (mod_4,))
|
||||
assert result == {}
|
||||
result = await loader.resolve_integrations_after_dependencies(
|
||||
hass, (mod_4,), ignore_exceptions=True
|
||||
)
|
||||
assert result["mod4"] == {"mod4", "mod2", "mod1"}
|
||||
|
||||
|
||||
async def test_nonexistent_component_dependencies(hass: HomeAssistant) -> None:
|
||||
"""Test if we can detect nonexistent dependencies of components."""
|
||||
mod_1 = mock_integration(hass, MockModule("mod1", dependencies=["nonexistent"]))
|
||||
with pytest.raises(loader.IntegrationNotFound):
|
||||
await loader._async_component_dependencies(hass, mod_1)
|
||||
mod_2 = mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
|
||||
|
||||
assert not await mod_2.resolve_dependencies()
|
||||
assert await mod_2.resolve_dependencies() is None
|
||||
assert mod_2.all_dependencies_resolved
|
||||
with pytest.raises(RuntimeError):
|
||||
with pytest.raises(loader.IntegrationNotFound):
|
||||
mod_2.all_dependencies # noqa: B018
|
||||
|
||||
# this currently is not resolved, because intermediate results are not cached
|
||||
# this will be improved in a follow-up
|
||||
assert not mod_1.all_dependencies_resolved
|
||||
assert not await mod_1.resolve_dependencies()
|
||||
with pytest.raises(RuntimeError):
|
||||
assert mod_1.all_dependencies_resolved
|
||||
assert await mod_1.resolve_dependencies() is None
|
||||
with pytest.raises(loader.IntegrationNotFound):
|
||||
mod_1.all_dependencies # noqa: B018
|
||||
|
||||
result = await loader.resolve_integrations_dependencies(hass, (mod_2, mod_1))
|
||||
assert result == {}
|
||||
|
||||
mod_1 = mock_integration(
|
||||
hass,
|
||||
MockModule("mod1", partial_manifest={"after_dependencies": ["non.existent"]}),
|
||||
)
|
||||
mod_2 = mock_integration(hass, MockModule("mod2", dependencies=["mod1"]))
|
||||
|
||||
result = await loader.resolve_integrations_after_dependencies(hass, (mod_2, mod_1))
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_component_loader(hass: HomeAssistant) -> None:
|
||||
"""Test loading components."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user