From 69307374f49f945f955379b2ffaf66521d192194 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 7 Jan 2024 17:55:40 -1000 Subject: [PATCH] Signficantly reduce executor contention during bootstrap (#107312) * Signficantly reduce executor contention during bootstrap At startup we have a thundering herd wanting to use the executor to load manifiest.json. Since we know which integrations we are about to load in each resolver step, group the manifest loads into single executor jobs by calling async_get_integrations on the deps of the integrations after they are resolved. In practice this reduced the number of executor jobs by 80% during bootstrap * merge * naming * tweak * tweak * not enough contention to be worth it there * refactor to avoid waiting * refactor to avoid waiting * tweaks * tweaks * tweak * background is fine * comment --- homeassistant/bootstrap.py | 41 ++++++++++++++++++++++++++++++----- homeassistant/requirements.py | 19 ++++++++++++++++ homeassistant/util/package.py | 5 +++++ 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 83b2f18719f..bca74a684b2 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -16,7 +16,7 @@ from typing import TYPE_CHECKING, Any import voluptuous as vol import yarl -from . import config as conf_util, config_entries, core, loader +from . import config as conf_util, config_entries, core, loader, requirements from .components import http from .const import ( FORMAT_DATETIME, @@ -229,7 +229,7 @@ def open_hass_ui(hass: core.HomeAssistant) -> None: ) -async def load_registries(hass: core.HomeAssistant) -> None: +async def async_load_base_functionality(hass: core.HomeAssistant) -> None: """Load the registries and cache the result of platform.uname().processor.""" if DATA_REGISTRIES_LOADED in hass.data: return @@ -256,6 +256,7 @@ async def load_registries(hass: core.HomeAssistant) -> None: hass.async_add_executor_job(_cache_uname_processor), template.async_load_custom_templates(hass), restore_state.async_load(hass), + hass.config_entries.async_initialize(), ) @@ -270,8 +271,7 @@ async def async_from_config_dict( start = monotonic() hass.config_entries = config_entries.ConfigEntries(hass, config) - await hass.config_entries.async_initialize() - await load_registries(hass) + await async_load_base_functionality(hass) # Set up core. _LOGGER.debug("Setting up %s", CORE_INTEGRATIONS) @@ -527,11 +527,13 @@ async def async_setup_multi_components( config: dict[str, Any], ) -> None: """Set up multiple domains. Log on failure.""" + # Avoid creating tasks for domains that were setup in a previous stage + domains_not_yet_setup = domains - hass.config.components futures = { domain: hass.async_create_task( async_setup_component(hass, domain, config), f"setup component {domain}" ) - for domain in domains + for domain in domains_not_yet_setup } results = await asyncio.gather(*futures.values(), return_exceptions=True) for idx, domain in enumerate(futures): @@ -555,6 +557,8 @@ async def _async_set_up_integrations( domains_to_setup = _get_domains(hass, config) + needed_requirements: set[str] = set() + # Resolve all dependencies so we know all integrations # that will have to be loaded and start rightaway integration_cache: dict[str, loader.Integration] = {} @@ -570,6 +574,25 @@ async def _async_set_up_integrations( ).values() if isinstance(int_or_exc, loader.Integration) ] + + manifest_deps: set[str] = set() + for itg in integrations_to_process: + manifest_deps.update(itg.dependencies) + manifest_deps.update(itg.after_dependencies) + needed_requirements.update(itg.requirements) + + if manifest_deps: + # If there are dependencies, try to preload all + # the integrations manifest at once and add them + # to the list of requirements we need to install + # so we can try to check if they are already installed + # in a single call below which avoids each integration + # having to wait for the lock to do it individually + deps = await loader.async_get_integrations(hass, manifest_deps) + for dependant_itg in deps.values(): + if isinstance(dependant_itg, loader.Integration): + needed_requirements.update(dependant_itg.requirements) + resolve_dependencies_tasks = [ itg.resolve_dependencies() for itg in integrations_to_process @@ -591,6 +614,14 @@ async def _async_set_up_integrations( _LOGGER.info("Domains to be set up: %s", domains_to_setup) + # Optimistically check if requirements are already installed + # ahead of setting up the integrations so we can prime the cache + # We do not wait for this since its an optimization only + hass.async_create_background_task( + requirements.async_load_installed_versions(hass, needed_requirements), + "check installed requirements", + ) + # Initialize recorder if "recorder" in domains_to_setup: recorder.async_initialize_recorder(hass) diff --git a/homeassistant/requirements.py b/homeassistant/requirements.py index 27a9607a6ee..9892a4d9169 100644 --- a/homeassistant/requirements.py +++ b/homeassistant/requirements.py @@ -63,6 +63,13 @@ async def async_process_requirements( await _async_get_manager(hass).async_process_requirements(name, requirements) +async def async_load_installed_versions( + hass: HomeAssistant, requirements: set[str] +) -> None: + """Load the installed version of requirements.""" + await _async_get_manager(hass).async_load_installed_versions(requirements) + + @callback def _async_get_manager(hass: HomeAssistant) -> RequirementsManager: """Get the requirements manager.""" @@ -284,3 +291,15 @@ class RequirementsManager: self.install_failure_history |= failures if failures: raise RequirementsNotFound(name, list(failures)) + + async def async_load_installed_versions( + self, + requirements: set[str], + ) -> None: + """Load the installed version of requirements.""" + if not (requirements_to_check := requirements - self.is_installed_cache): + return + + self.is_installed_cache |= await self.hass.async_add_executor_job( + pkg_util.get_installed_versions, requirements_to_check + ) diff --git a/homeassistant/util/package.py b/homeassistant/util/package.py index 61d282931c0..d487edee4a4 100644 --- a/homeassistant/util/package.py +++ b/homeassistant/util/package.py @@ -29,6 +29,11 @@ def is_docker_env() -> bool: return Path("/.dockerenv").exists() +def get_installed_versions(specifiers: set[str]) -> set[str]: + """Return a set of installed packages and versions.""" + return {specifier for specifier in specifiers if is_installed(specifier)} + + def is_installed(requirement_str: str) -> bool: """Check if a package is installed and will be loaded when we import it.