From 94bf55e29b89b448b6f45c15a9b7031aa0d59a1c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Tue, 10 Nov 2020 21:34:54 -1000 Subject: [PATCH] Limit concurrency of async_get_integration to avoid creating extra threads (#43085) * Limit concurrency of async_get_integration to avoid creating extra threads Since async_get_integration is waiting on the disk most of the time it would end up creating many new threads because the disk could not deliver the data in time. * pylint --- homeassistant/bootstrap.py | 6 +++++- homeassistant/helpers/service.py | 13 ++++++++++--- homeassistant/helpers/translation.py | 7 +++++-- homeassistant/loader.py | 2 ++ homeassistant/util/async_.py | 24 +++++++++++++++++++++--- tests/util/test_async.py | 25 +++++++++++++++++++++++++ 6 files changed, 68 insertions(+), 9 deletions(-) diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 2778747f15d..0d63307a020 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -28,6 +28,7 @@ from homeassistant.setup import ( async_set_domains_to_be_loaded, async_setup_component, ) +from homeassistant.util.async_ import gather_with_concurrency from homeassistant.util.logging import async_activate_log_queue_handler from homeassistant.util.package import async_get_user_site, is_virtual_env from homeassistant.util.yaml import clear_secret_cache @@ -49,6 +50,8 @@ STAGE_2_TIMEOUT = 300 WRAP_UP_TIMEOUT = 300 COOLDOWN_TIME = 60 +MAX_LOAD_CONCURRENTLY = 6 + DEBUGGER_INTEGRATIONS = {"debugpy", "ptvsd"} CORE_INTEGRATIONS = ("homeassistant", "persistent_notification") LOGGING_INTEGRATIONS = { @@ -442,7 +445,8 @@ async def _async_set_up_integrations( integrations_to_process = [ int_or_exc - for int_or_exc in await asyncio.gather( + for int_or_exc in await gather_with_concurrency( + loader.MAX_LOAD_CONCURRENTLY, *( loader.async_get_integration(hass, domain) for domain in old_to_resolve diff --git a/homeassistant/helpers/service.py b/homeassistant/helpers/service.py index 0993f490537..06d0ae46ae3 100644 --- a/homeassistant/helpers/service.py +++ b/homeassistant/helpers/service.py @@ -38,7 +38,13 @@ from homeassistant.helpers import template import homeassistant.helpers.config_validation as cv from homeassistant.helpers.template import Template from homeassistant.helpers.typing import ConfigType, HomeAssistantType, TemplateVarsType -from homeassistant.loader import Integration, async_get_integration, bind_hass +from homeassistant.loader import ( + MAX_LOAD_CONCURRENTLY, + Integration, + async_get_integration, + bind_hass, +) +from homeassistant.util.async_ import gather_with_concurrency from homeassistant.util.yaml import load_yaml from homeassistant.util.yaml.loader import JSON_TYPE @@ -307,8 +313,9 @@ async def async_get_all_descriptions( loaded = {} if missing: - integrations = await asyncio.gather( - *(async_get_integration(hass, domain) for domain in missing) + integrations = await gather_with_concurrency( + MAX_LOAD_CONCURRENTLY, + *(async_get_integration(hass, domain) for domain in missing), ) contents = await hass.async_add_executor_job( diff --git a/homeassistant/helpers/translation.py b/homeassistant/helpers/translation.py index 0ab25aba777..bd229d79111 100644 --- a/homeassistant/helpers/translation.py +++ b/homeassistant/helpers/translation.py @@ -6,11 +6,13 @@ from typing import Any, Dict, List, Optional, Set from homeassistant.core import callback from homeassistant.loader import ( + MAX_LOAD_CONCURRENTLY, Integration, async_get_config_flows, async_get_integration, bind_hass, ) +from homeassistant.util.async_ import gather_with_concurrency from homeassistant.util.json import load_json from .typing import HomeAssistantType @@ -151,8 +153,9 @@ async def async_get_component_strings( integrations = dict( zip( domains, - await asyncio.gather( - *[async_get_integration(hass, domain) for domain in domains] + await gather_with_concurrency( + MAX_LOAD_CONCURRENTLY, + *[async_get_integration(hass, domain) for domain in domains], ), ) ) diff --git a/homeassistant/loader.py b/homeassistant/loader.py index cb701213fe7..6dabfdf0447 100644 --- a/homeassistant/loader.py +++ b/homeassistant/loader.py @@ -50,6 +50,8 @@ CUSTOM_WARNING = ( ) _UNDEF = object() +MAX_LOAD_CONCURRENTLY = 4 + def manifest_from_legacy_module(domain: str, module: ModuleType) -> Dict: """Generate a manifest from a legacy module.""" diff --git a/homeassistant/util/async_.py b/homeassistant/util/async_.py index 0b48e2159c0..ded44473038 100644 --- a/homeassistant/util/async_.py +++ b/homeassistant/util/async_.py @@ -1,12 +1,12 @@ -"""Asyncio backports for Python 3.6 compatibility.""" -from asyncio import coroutines, ensure_future, get_running_loop +"""Asyncio utilities.""" +from asyncio import Semaphore, coroutines, ensure_future, gather, get_running_loop from asyncio.events import AbstractEventLoop import concurrent.futures import functools import logging import threading from traceback import extract_stack -from typing import Any, Callable, Coroutine, TypeVar +from typing import Any, Awaitable, Callable, Coroutine, TypeVar _LOGGER = logging.getLogger(__name__) @@ -121,3 +121,21 @@ def protect_loop(func: Callable) -> Callable: return func(*args, **kwargs) return protected_loop_func + + +async def gather_with_concurrency( + limit: int, *tasks: Any, return_exceptions: bool = False +) -> Any: + """Wrap asyncio.gather to limit the number of concurrent tasks. + + From: https://stackoverflow.com/a/61478547/9127614 + """ + semaphore = Semaphore(limit) + + async def sem_task(task: Awaitable[Any]) -> Any: + async with semaphore: + return await task + + return await gather( + *(sem_task(task) for task in tasks), return_exceptions=return_exceptions + ) diff --git a/tests/util/test_async.py b/tests/util/test_async.py index 460490eb783..047697168b4 100644 --- a/tests/util/test_async.py +++ b/tests/util/test_async.py @@ -1,4 +1,7 @@ """Tests for async util methods from Python source.""" +import asyncio +import time + import pytest from homeassistant.util import async_ as hasync @@ -144,3 +147,25 @@ def test_protect_loop_sync(): hasync.protect_loop(calls.append)(1) assert len(mock_loop.mock_calls) == 1 assert calls == [1] + + +async def test_gather_with_concurrency(): + """Test gather_with_concurrency limits the number of running tasks.""" + + runs = 0 + now_time = time.time() + + async def _increment_runs_if_in_time(): + if time.time() - now_time > 0.1: + return -1 + + nonlocal runs + runs += 1 + await asyncio.sleep(0.1) + return runs + + results = await hasync.gather_with_concurrency( + 2, *[_increment_runs_if_in_time() for i in range(4)] + ) + + assert results == [2, 2, -1, -1]