mirror of
https://github.com/home-assistant/core.git
synced 2025-04-26 02:07:54 +00:00
Group helpers of set_up_integrations in bootstrap (#137673)
This commit is contained in:
parent
6cb0201031
commit
ae55e26546
@ -716,109 +716,6 @@ def _get_domains(hass: core.HomeAssistant, config: dict[str, Any]) -> set[str]:
|
|||||||
return domains
|
return domains
|
||||||
|
|
||||||
|
|
||||||
class _WatchPendingSetups:
|
|
||||||
"""Periodic log and dispatch of setups that are pending."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
hass: core.HomeAssistant,
|
|
||||||
setup_started: dict[tuple[str, str | None], float],
|
|
||||||
) -> None:
|
|
||||||
"""Initialize the WatchPendingSetups class."""
|
|
||||||
self._hass = hass
|
|
||||||
self._setup_started = setup_started
|
|
||||||
self._duration_count = 0
|
|
||||||
self._handle: asyncio.TimerHandle | None = None
|
|
||||||
self._previous_was_empty = True
|
|
||||||
self._loop = hass.loop
|
|
||||||
|
|
||||||
def _async_watch(self) -> None:
|
|
||||||
"""Periodic log of setups that are pending."""
|
|
||||||
now = monotonic()
|
|
||||||
self._duration_count += SLOW_STARTUP_CHECK_INTERVAL
|
|
||||||
|
|
||||||
remaining_with_setup_started: defaultdict[str, float] = defaultdict(float)
|
|
||||||
for integration_group, start_time in self._setup_started.items():
|
|
||||||
domain, _ = integration_group
|
|
||||||
remaining_with_setup_started[domain] += now - start_time
|
|
||||||
|
|
||||||
if remaining_with_setup_started:
|
|
||||||
_LOGGER.debug("Integration remaining: %s", remaining_with_setup_started)
|
|
||||||
elif waiting_tasks := self._hass._active_tasks: # noqa: SLF001
|
|
||||||
_LOGGER.debug("Waiting on tasks: %s", waiting_tasks)
|
|
||||||
self._async_dispatch(remaining_with_setup_started)
|
|
||||||
if (
|
|
||||||
self._setup_started
|
|
||||||
and self._duration_count % LOG_SLOW_STARTUP_INTERVAL == 0
|
|
||||||
):
|
|
||||||
# We log every LOG_SLOW_STARTUP_INTERVAL until all integrations are done
|
|
||||||
# once we take over LOG_SLOW_STARTUP_INTERVAL (60s) to start up
|
|
||||||
_LOGGER.warning(
|
|
||||||
"Waiting on integrations to complete setup: %s",
|
|
||||||
self._setup_started,
|
|
||||||
)
|
|
||||||
|
|
||||||
_LOGGER.debug("Running timeout Zones: %s", self._hass.timeout.zones)
|
|
||||||
self._async_schedule_next()
|
|
||||||
|
|
||||||
def _async_dispatch(self, remaining_with_setup_started: dict[str, float]) -> None:
|
|
||||||
"""Dispatch the signal."""
|
|
||||||
if remaining_with_setup_started or not self._previous_was_empty:
|
|
||||||
async_dispatcher_send_internal(
|
|
||||||
self._hass, SIGNAL_BOOTSTRAP_INTEGRATIONS, remaining_with_setup_started
|
|
||||||
)
|
|
||||||
self._previous_was_empty = not remaining_with_setup_started
|
|
||||||
|
|
||||||
def _async_schedule_next(self) -> None:
|
|
||||||
"""Schedule the next call."""
|
|
||||||
self._handle = self._loop.call_later(
|
|
||||||
SLOW_STARTUP_CHECK_INTERVAL, self._async_watch
|
|
||||||
)
|
|
||||||
|
|
||||||
def async_start(self) -> None:
|
|
||||||
"""Start watching."""
|
|
||||||
self._async_schedule_next()
|
|
||||||
|
|
||||||
def async_stop(self) -> None:
|
|
||||||
"""Stop watching."""
|
|
||||||
self._async_dispatch({})
|
|
||||||
if self._handle:
|
|
||||||
self._handle.cancel()
|
|
||||||
self._handle = None
|
|
||||||
|
|
||||||
|
|
||||||
async def async_setup_multi_components(
|
|
||||||
hass: core.HomeAssistant,
|
|
||||||
domains: set[str],
|
|
||||||
config: dict[str, Any],
|
|
||||||
) -> None:
|
|
||||||
"""Set up multiple domains. Log on failure."""
|
|
||||||
# Avoid creating tasks for domains that were setup in a previous stage
|
|
||||||
domains_not_yet_setup = domains - hass.config.components
|
|
||||||
# Create setup tasks for base platforms first since everything will have
|
|
||||||
# to wait to be imported, and the sooner we can get the base platforms
|
|
||||||
# loaded the sooner we can start loading the rest of the integrations.
|
|
||||||
futures = {
|
|
||||||
domain: hass.async_create_task_internal(
|
|
||||||
async_setup_component(hass, domain, config),
|
|
||||||
f"setup component {domain}",
|
|
||||||
eager_start=True,
|
|
||||||
)
|
|
||||||
for domain in sorted(
|
|
||||||
domains_not_yet_setup, key=SETUP_ORDER_SORT_KEY, reverse=True
|
|
||||||
)
|
|
||||||
}
|
|
||||||
results = await asyncio.gather(*futures.values(), return_exceptions=True)
|
|
||||||
for idx, domain in enumerate(futures):
|
|
||||||
result = results[idx]
|
|
||||||
if isinstance(result, BaseException):
|
|
||||||
_LOGGER.error(
|
|
||||||
"Error setting up integration %s - received exception",
|
|
||||||
domain,
|
|
||||||
exc_info=(type(result), result, result.__traceback__),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def _async_resolve_domains_to_setup(
|
async def _async_resolve_domains_to_setup(
|
||||||
hass: core.HomeAssistant, config: dict[str, Any]
|
hass: core.HomeAssistant, config: dict[str, Any]
|
||||||
) -> tuple[set[str], dict[str, loader.Integration]]:
|
) -> tuple[set[str], dict[str, loader.Integration]]:
|
||||||
@ -1038,7 +935,7 @@ async def _async_set_up_integrations(
|
|||||||
for dep in integration.all_dependencies
|
for dep in integration.all_dependencies
|
||||||
)
|
)
|
||||||
async_set_domains_to_be_loaded(hass, to_be_loaded)
|
async_set_domains_to_be_loaded(hass, to_be_loaded)
|
||||||
await async_setup_multi_components(hass, domain_group, config)
|
await _async_setup_multi_components(hass, domain_group, config)
|
||||||
|
|
||||||
# Enables after dependencies when setting up stage 1 domains
|
# Enables after dependencies when setting up stage 1 domains
|
||||||
async_set_domains_to_be_loaded(hass, stage_1_domains)
|
async_set_domains_to_be_loaded(hass, stage_1_domains)
|
||||||
@ -1050,7 +947,7 @@ async def _async_set_up_integrations(
|
|||||||
async with hass.timeout.async_timeout(
|
async with hass.timeout.async_timeout(
|
||||||
STAGE_1_TIMEOUT, cool_down=COOLDOWN_TIME
|
STAGE_1_TIMEOUT, cool_down=COOLDOWN_TIME
|
||||||
):
|
):
|
||||||
await async_setup_multi_components(hass, stage_1_domains, config)
|
await _async_setup_multi_components(hass, stage_1_domains, config)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Setup timed out for stage 1 waiting on %s - moving forward",
|
"Setup timed out for stage 1 waiting on %s - moving forward",
|
||||||
@ -1066,7 +963,7 @@ async def _async_set_up_integrations(
|
|||||||
async with hass.timeout.async_timeout(
|
async with hass.timeout.async_timeout(
|
||||||
STAGE_2_TIMEOUT, cool_down=COOLDOWN_TIME
|
STAGE_2_TIMEOUT, cool_down=COOLDOWN_TIME
|
||||||
):
|
):
|
||||||
await async_setup_multi_components(hass, stage_2_domains, config)
|
await _async_setup_multi_components(hass, stage_2_domains, config)
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"Setup timed out for stage 2 waiting on %s - moving forward",
|
"Setup timed out for stage 2 waiting on %s - moving forward",
|
||||||
@ -1092,3 +989,106 @@ async def _async_set_up_integrations(
|
|||||||
"Integration setup times: %s",
|
"Integration setup times: %s",
|
||||||
dict(sorted(setup_time.items(), key=itemgetter(1), reverse=True)),
|
dict(sorted(setup_time.items(), key=itemgetter(1), reverse=True)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class _WatchPendingSetups:
|
||||||
|
"""Periodic log and dispatch of setups that are pending."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hass: core.HomeAssistant,
|
||||||
|
setup_started: dict[tuple[str, str | None], float],
|
||||||
|
) -> None:
|
||||||
|
"""Initialize the WatchPendingSetups class."""
|
||||||
|
self._hass = hass
|
||||||
|
self._setup_started = setup_started
|
||||||
|
self._duration_count = 0
|
||||||
|
self._handle: asyncio.TimerHandle | None = None
|
||||||
|
self._previous_was_empty = True
|
||||||
|
self._loop = hass.loop
|
||||||
|
|
||||||
|
def _async_watch(self) -> None:
|
||||||
|
"""Periodic log of setups that are pending."""
|
||||||
|
now = monotonic()
|
||||||
|
self._duration_count += SLOW_STARTUP_CHECK_INTERVAL
|
||||||
|
|
||||||
|
remaining_with_setup_started: defaultdict[str, float] = defaultdict(float)
|
||||||
|
for integration_group, start_time in self._setup_started.items():
|
||||||
|
domain, _ = integration_group
|
||||||
|
remaining_with_setup_started[domain] += now - start_time
|
||||||
|
|
||||||
|
if remaining_with_setup_started:
|
||||||
|
_LOGGER.debug("Integration remaining: %s", remaining_with_setup_started)
|
||||||
|
elif waiting_tasks := self._hass._active_tasks: # noqa: SLF001
|
||||||
|
_LOGGER.debug("Waiting on tasks: %s", waiting_tasks)
|
||||||
|
self._async_dispatch(remaining_with_setup_started)
|
||||||
|
if (
|
||||||
|
self._setup_started
|
||||||
|
and self._duration_count % LOG_SLOW_STARTUP_INTERVAL == 0
|
||||||
|
):
|
||||||
|
# We log every LOG_SLOW_STARTUP_INTERVAL until all integrations are done
|
||||||
|
# once we take over LOG_SLOW_STARTUP_INTERVAL (60s) to start up
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Waiting on integrations to complete setup: %s",
|
||||||
|
self._setup_started,
|
||||||
|
)
|
||||||
|
|
||||||
|
_LOGGER.debug("Running timeout Zones: %s", self._hass.timeout.zones)
|
||||||
|
self._async_schedule_next()
|
||||||
|
|
||||||
|
def _async_dispatch(self, remaining_with_setup_started: dict[str, float]) -> None:
|
||||||
|
"""Dispatch the signal."""
|
||||||
|
if remaining_with_setup_started or not self._previous_was_empty:
|
||||||
|
async_dispatcher_send_internal(
|
||||||
|
self._hass, SIGNAL_BOOTSTRAP_INTEGRATIONS, remaining_with_setup_started
|
||||||
|
)
|
||||||
|
self._previous_was_empty = not remaining_with_setup_started
|
||||||
|
|
||||||
|
def _async_schedule_next(self) -> None:
|
||||||
|
"""Schedule the next call."""
|
||||||
|
self._handle = self._loop.call_later(
|
||||||
|
SLOW_STARTUP_CHECK_INTERVAL, self._async_watch
|
||||||
|
)
|
||||||
|
|
||||||
|
def async_start(self) -> None:
|
||||||
|
"""Start watching."""
|
||||||
|
self._async_schedule_next()
|
||||||
|
|
||||||
|
def async_stop(self) -> None:
|
||||||
|
"""Stop watching."""
|
||||||
|
self._async_dispatch({})
|
||||||
|
if self._handle:
|
||||||
|
self._handle.cancel()
|
||||||
|
self._handle = None
|
||||||
|
|
||||||
|
|
||||||
|
async def _async_setup_multi_components(
|
||||||
|
hass: core.HomeAssistant,
|
||||||
|
domains: set[str],
|
||||||
|
config: dict[str, Any],
|
||||||
|
) -> None:
|
||||||
|
"""Set up multiple domains. Log on failure."""
|
||||||
|
# Avoid creating tasks for domains that were setup in a previous stage
|
||||||
|
domains_not_yet_setup = domains - hass.config.components
|
||||||
|
# Create setup tasks for base platforms first since everything will have
|
||||||
|
# to wait to be imported, and the sooner we can get the base platforms
|
||||||
|
# loaded the sooner we can start loading the rest of the integrations.
|
||||||
|
futures = {
|
||||||
|
domain: hass.async_create_task_internal(
|
||||||
|
async_setup_component(hass, domain, config),
|
||||||
|
f"setup component {domain}",
|
||||||
|
eager_start=True,
|
||||||
|
)
|
||||||
|
for domain in sorted(
|
||||||
|
domains_not_yet_setup, key=SETUP_ORDER_SORT_KEY, reverse=True
|
||||||
|
)
|
||||||
|
}
|
||||||
|
results = await asyncio.gather(*futures.values(), return_exceptions=True)
|
||||||
|
for idx, domain in enumerate(futures):
|
||||||
|
result = results[idx]
|
||||||
|
if isinstance(result, BaseException):
|
||||||
|
_LOGGER.error(
|
||||||
|
"Error setting up integration %s - received exception",
|
||||||
|
domain,
|
||||||
|
exc_info=(type(result), result, result.__traceback__),
|
||||||
|
)
|
||||||
|
@ -1176,7 +1176,7 @@ async def test_bootstrap_is_cancellation_safe(
|
|||||||
@pytest.mark.parametrize("load_registries", [False])
|
@pytest.mark.parametrize("load_registries", [False])
|
||||||
async def test_bootstrap_empty_integrations(hass: HomeAssistant) -> None:
|
async def test_bootstrap_empty_integrations(hass: HomeAssistant) -> None:
|
||||||
"""Test setting up an empty integrations does not raise."""
|
"""Test setting up an empty integrations does not raise."""
|
||||||
await bootstrap.async_setup_multi_components(hass, set(), {})
|
await bootstrap._async_setup_multi_components(hass, set(), {})
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
|
|
||||||
@ -1311,7 +1311,7 @@ async def test_bootstrap_dependencies(
|
|||||||
),
|
),
|
||||||
):
|
):
|
||||||
bootstrap.async_set_domains_to_be_loaded(hass, {integration})
|
bootstrap.async_set_domains_to_be_loaded(hass, {integration})
|
||||||
await bootstrap.async_setup_multi_components(hass, {integration}, {})
|
await bootstrap._async_setup_multi_components(hass, {integration}, {})
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
for assertion in assertions:
|
for assertion in assertions:
|
||||||
@ -1407,7 +1407,7 @@ async def test_cancellation_does_not_leak_upward_from_async_setup(
|
|||||||
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test setting up an integration that raises asyncio.CancelledError."""
|
"""Test setting up an integration that raises asyncio.CancelledError."""
|
||||||
await bootstrap.async_setup_multi_components(
|
await bootstrap._async_setup_multi_components(
|
||||||
hass, {"test_package_raises_cancelled_error"}, {}
|
hass, {"test_package_raises_cancelled_error"}, {}
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
@ -1428,12 +1428,12 @@ async def test_cancellation_does_not_leak_upward_from_async_setup_entry(
|
|||||||
domain="test_package_raises_cancelled_error_config_entry", data={}
|
domain="test_package_raises_cancelled_error_config_entry", data={}
|
||||||
)
|
)
|
||||||
entry.add_to_hass(hass)
|
entry.add_to_hass(hass)
|
||||||
await bootstrap.async_setup_multi_components(
|
await bootstrap._async_setup_multi_components(
|
||||||
hass, {"test_package_raises_cancelled_error_config_entry"}, {}
|
hass, {"test_package_raises_cancelled_error_config_entry"}, {}
|
||||||
)
|
)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
|
||||||
await bootstrap.async_setup_multi_components(hass, {"test_package"}, {})
|
await bootstrap._async_setup_multi_components(hass, {"test_package"}, {})
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
assert (
|
assert (
|
||||||
"Error setting up entry Mock Title for test_package_raises_cancelled_error_config_entry"
|
"Error setting up entry Mock Title for test_package_raises_cancelled_error_config_entry"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user