mirror of
https://github.com/home-assistant/core.git
synced 2025-11-09 02:49:40 +00:00
Remove ThreadPool with async executor (#4154)
* Remove ThreadPool with async executor * Fix zigbee * update unittest * fix remote api * add pending task to remote * fix lint * remove unused import * remove old stuff for lazy tests * fix bug and add a exception handler to executor * change executor handling * change to wait from gather * fix unittest
This commit is contained in:
committed by
Paulus Schoutsen
parent
b67f1fed52
commit
ece58ce78f
@@ -14,7 +14,7 @@ import re
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
|
||||
from types import MappingProxyType
|
||||
from typing import Optional, Any, Callable, List # NOQA
|
||||
@@ -53,16 +53,11 @@ TIMER_INTERVAL = 1 # seconds
|
||||
# How long we wait for the result of a service call
|
||||
SERVICE_CALL_LIMIT = 10 # seconds
|
||||
|
||||
# Define number of MINIMUM worker threads.
|
||||
# During bootstrap of HA (see bootstrap._setup_component()) worker threads
|
||||
# will be added for each component that polls devices.
|
||||
MIN_WORKER_THREAD = 2
|
||||
|
||||
# Pattern for validating entity IDs (format: <domain>.<entity>)
|
||||
ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$")
|
||||
|
||||
# Interval at which we check if the pool is getting busy
|
||||
MONITOR_POOL_INTERVAL = 30
|
||||
# Size of a executor pool
|
||||
EXECUTOR_POOL_SIZE = 10
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@@ -115,7 +110,7 @@ class HomeAssistant(object):
|
||||
self.executor = ThreadPoolExecutor(max_workers=5)
|
||||
self.loop.set_default_executor(self.executor)
|
||||
self.loop.set_exception_handler(self._async_exception_handler)
|
||||
self.pool = None
|
||||
self._pending_tasks = weakref.WeakSet()
|
||||
self.bus = EventBus(self)
|
||||
self.services = ServiceRegistry(self.bus, self.async_add_job,
|
||||
self.loop)
|
||||
@@ -190,20 +185,16 @@ class HomeAssistant(object):
|
||||
self.loop._thread_ident = threading.get_ident()
|
||||
_async_create_timer(self)
|
||||
self.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
||||
if self.pool is not None:
|
||||
yield from self.loop.run_in_executor(
|
||||
None, self.pool.block_till_done)
|
||||
self.state = CoreState.running
|
||||
|
||||
def add_job(self, target: Callable[..., None], *args: Any) -> None:
|
||||
"""Add job to the worker pool.
|
||||
"""Add job to the executor pool.
|
||||
|
||||
target: target to call.
|
||||
args: parameters for method to call.
|
||||
"""
|
||||
if self.pool is None:
|
||||
run_callback_threadsafe(self.pool, self.async_init_pool).result()
|
||||
self.pool.add_job((target,) + args)
|
||||
run_callback_threadsafe(
|
||||
self.loop, self.async_add_job, target, *args).result()
|
||||
|
||||
@callback
|
||||
def async_add_job(self, target: Callable[..., None], *args: Any) -> None:
|
||||
@@ -214,14 +205,18 @@ class HomeAssistant(object):
|
||||
target: target to call.
|
||||
args: parameters for method to call.
|
||||
"""
|
||||
task = None
|
||||
|
||||
if is_callback(target):
|
||||
self.loop.call_soon(target, *args)
|
||||
elif asyncio.iscoroutinefunction(target):
|
||||
self.loop.create_task(target(*args))
|
||||
task = self.loop.create_task(target(*args))
|
||||
else:
|
||||
if self.pool is None:
|
||||
self.async_init_pool()
|
||||
self.pool.add_job((target,) + args)
|
||||
task = self.loop.run_in_executor(None, target, *args)
|
||||
|
||||
# if a task is sheduled
|
||||
if task is not None:
|
||||
self._pending_tasks.add(task)
|
||||
|
||||
@callback
|
||||
def async_run_job(self, target: Callable[..., None], *args: Any) -> None:
|
||||
@@ -249,37 +244,21 @@ class HomeAssistant(object):
|
||||
|
||||
def block_till_done(self) -> None:
|
||||
"""Block till all pending work is done."""
|
||||
complete = threading.Event()
|
||||
run_coroutine_threadsafe(
|
||||
self.async_block_till_done(), loop=self.loop).result()
|
||||
|
||||
@asyncio.coroutine
|
||||
def sleep_wait():
|
||||
"""Sleep in thread pool."""
|
||||
yield from self.loop.run_in_executor(None, time.sleep, 0)
|
||||
@asyncio.coroutine
|
||||
def async_block_till_done(self):
|
||||
"""Block till all pending work is done."""
|
||||
while True:
|
||||
# Wait for the pending tasks are down
|
||||
if len(self._pending_tasks) > 0:
|
||||
yield from asyncio.wait(self._pending_tasks, loop=self.loop)
|
||||
|
||||
def notify_when_done():
|
||||
"""Notify event loop when pool done."""
|
||||
count = 0
|
||||
while True:
|
||||
# Wait for the work queue to empty
|
||||
if self.pool is not None:
|
||||
self.pool.block_till_done()
|
||||
|
||||
# Verify the loop is empty
|
||||
if self._loop_empty():
|
||||
count += 1
|
||||
|
||||
if count == 2:
|
||||
break
|
||||
|
||||
# sleep in the loop executor, this forces execution back into
|
||||
# the event loop to avoid the block thread from starving the
|
||||
# async loop
|
||||
run_coroutine_threadsafe(sleep_wait(), self.loop).result()
|
||||
|
||||
complete.set()
|
||||
|
||||
threading.Thread(name="BlockThread", target=notify_when_done).start()
|
||||
complete.wait()
|
||||
# Verify the loop is empty
|
||||
ret = yield from self.loop.run_in_executor(None, self._loop_empty)
|
||||
if ret:
|
||||
break
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop Home Assistant and shuts down all threads."""
|
||||
@@ -293,10 +272,7 @@ class HomeAssistant(object):
|
||||
"""
|
||||
self.state = CoreState.stopping
|
||||
self.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||
if self.pool is not None:
|
||||
yield from self.loop.run_in_executor(
|
||||
None, self.pool.block_till_done)
|
||||
yield from self.loop.run_in_executor(None, self.pool.stop)
|
||||
yield from self.async_block_till_done()
|
||||
self.executor.shutdown()
|
||||
if self._websession is not None:
|
||||
yield from self._websession.close()
|
||||
@@ -323,23 +299,17 @@ class HomeAssistant(object):
|
||||
exc_info=exc_info
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_init_pool(self):
|
||||
"""Initialize the worker pool."""
|
||||
self.pool = create_worker_pool()
|
||||
_async_monitor_worker_pool(self)
|
||||
|
||||
@callback
|
||||
def _async_stop_handler(self, *args):
|
||||
"""Stop Home Assistant."""
|
||||
self.exit_code = 0
|
||||
self.async_add_job(self.async_stop)
|
||||
self.loop.create_task(self.async_stop())
|
||||
|
||||
@callback
|
||||
def _async_restart_handler(self, *args):
|
||||
"""Restart Home Assistant."""
|
||||
self.exit_code = RESTART_EXIT_CODE
|
||||
self.async_add_job(self.async_stop)
|
||||
self.loop.create_task(self.async_stop())
|
||||
|
||||
|
||||
class EventOrigin(enum.Enum):
|
||||
@@ -1196,65 +1166,3 @@ def _async_create_timer(hass, interval=TIMER_INTERVAL):
|
||||
hass.loop.create_task(timer(interval, stop_event))
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, start_timer)
|
||||
|
||||
|
||||
def create_worker_pool(worker_count=None):
|
||||
"""Create a worker pool."""
|
||||
if worker_count is None:
|
||||
worker_count = MIN_WORKER_THREAD
|
||||
|
||||
def job_handler(job):
|
||||
"""Called whenever a job is available to do."""
|
||||
try:
|
||||
func, *args = job
|
||||
func(*args)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# Catch any exception our service/event_listener might throw
|
||||
# We do not want to crash our ThreadPool
|
||||
_LOGGER.exception("BusHandler:Exception doing job")
|
||||
|
||||
return util.ThreadPool(job_handler, worker_count)
|
||||
|
||||
|
||||
def _async_monitor_worker_pool(hass):
|
||||
"""Create a monitor for the thread pool to check if pool is misbehaving."""
|
||||
busy_threshold = hass.pool.worker_count * 3
|
||||
|
||||
handle = None
|
||||
|
||||
def schedule():
|
||||
"""Schedule the monitor."""
|
||||
nonlocal handle
|
||||
handle = hass.loop.call_later(MONITOR_POOL_INTERVAL,
|
||||
check_pool_threshold)
|
||||
|
||||
def check_pool_threshold():
|
||||
"""Check pool size."""
|
||||
nonlocal busy_threshold
|
||||
|
||||
pending_jobs = hass.pool.queue_size
|
||||
|
||||
if pending_jobs < busy_threshold:
|
||||
schedule()
|
||||
return
|
||||
|
||||
_LOGGER.warning(
|
||||
"WorkerPool:All %d threads are busy and %d jobs pending",
|
||||
hass.pool.worker_count, pending_jobs)
|
||||
|
||||
for start, job in hass.pool.current_jobs:
|
||||
_LOGGER.warning("WorkerPool:Current job started at %s: %s",
|
||||
dt_util.as_local(start).isoformat(), job)
|
||||
|
||||
busy_threshold *= 2
|
||||
|
||||
schedule()
|
||||
|
||||
schedule()
|
||||
|
||||
@callback
|
||||
def stop_monitor(event):
|
||||
"""Stop the monitor."""
|
||||
handle.cancel()
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_monitor)
|
||||
|
||||
Reference in New Issue
Block a user