From ece58ce78fe44b521122e3f41b79e670010bc905 Mon Sep 17 00:00:00 2001 From: Pascal Vizeli Date: Sat, 5 Nov 2016 17:27:55 +0100 Subject: [PATCH] Remove ThreadPool with async executor (#4154) * Remove ThreadPool with async executor * Fix zigbee * update unittest * fix remote api * add pending task to remote * fix lint * remove unused import * remove old stuff for lazy tests * fix bug and add a exception handler to executor * change executor handling * change to wait from gather * fix unittest --- homeassistant/bootstrap.py | 9 -- homeassistant/components/sensor/zigbee.py | 2 +- homeassistant/components/zigbee.py | 4 +- homeassistant/core.py | 154 +++++---------------- homeassistant/remote.py | 5 +- homeassistant/util/__init__.py | 109 --------------- tests/common.py | 17 --- tests/components/camera/test_generic.py | 2 - tests/components/camera/test_local_file.py | 2 - tests/test_core.py | 79 +---------- 10 files changed, 37 insertions(+), 346 deletions(-) diff --git a/homeassistant/bootstrap.py b/homeassistant/bootstrap.py index 31e404ad87a..2b6c4711691 100644 --- a/homeassistant/bootstrap.py +++ b/homeassistant/bootstrap.py @@ -165,15 +165,6 @@ def _async_setup_component(hass: core.HomeAssistant, hass.config.components.append(component.DOMAIN) - # Assumption: if a component does not depend on groups - # it communicates with devices - if (not async_comp and - 'group' not in getattr(component, 'DEPENDENCIES', [])): - if hass.pool is None: - hass.async_init_pool() - if hass.pool.worker_count <= 10: - hass.pool.add_worker() - hass.bus.async_fire( EVENT_COMPONENT_LOADED, {ATTR_COMPONENT: component.DOMAIN} ) diff --git a/homeassistant/components/sensor/zigbee.py b/homeassistant/components/sensor/zigbee.py index 7d4ead138e3..78dab2547f3 100644 --- a/homeassistant/components/sensor/zigbee.py +++ b/homeassistant/components/sensor/zigbee.py @@ -55,7 +55,7 @@ class ZigBeeTemperatureSensor(Entity): self._config = config self._temp = None # Get initial state - hass.add_job(self.update_ha_state, True) + hass.add_job(self.async_update_ha_state, True) @property def name(self): diff --git a/homeassistant/components/zigbee.py b/homeassistant/components/zigbee.py index a428d03efc1..8fbc3f0a9c3 100644 --- a/homeassistant/components/zigbee.py +++ b/homeassistant/components/zigbee.py @@ -307,7 +307,7 @@ class ZigBeeDigitalIn(Entity): subscribe(hass, handle_frame) # Get initial state - hass.add_job(self.update_ha_state, True) + hass.add_job(self.async_update_ha_state, True) @property def name(self): @@ -433,7 +433,7 @@ class ZigBeeAnalogIn(Entity): subscribe(hass, handle_frame) # Get initial state - hass.add_job(self.update_ha_state, True) + hass.add_job(self.async_update_ha_state, True) @property def name(self): diff --git a/homeassistant/core.py b/homeassistant/core.py index 8de1e2b2535..5f340bdd941 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -14,7 +14,7 @@ import re import signal import sys import threading -import time +import weakref from types import MappingProxyType from typing import Optional, Any, Callable, List # NOQA @@ -53,16 +53,11 @@ TIMER_INTERVAL = 1 # seconds # How long we wait for the result of a service call SERVICE_CALL_LIMIT = 10 # seconds -# Define number of MINIMUM worker threads. -# During bootstrap of HA (see bootstrap._setup_component()) worker threads -# will be added for each component that polls devices. -MIN_WORKER_THREAD = 2 - # Pattern for validating entity IDs (format: .) ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$") -# Interval at which we check if the pool is getting busy -MONITOR_POOL_INTERVAL = 30 +# Size of a executor pool +EXECUTOR_POOL_SIZE = 10 _LOGGER = logging.getLogger(__name__) @@ -115,7 +110,7 @@ class HomeAssistant(object): self.executor = ThreadPoolExecutor(max_workers=5) self.loop.set_default_executor(self.executor) self.loop.set_exception_handler(self._async_exception_handler) - self.pool = None + self._pending_tasks = weakref.WeakSet() self.bus = EventBus(self) self.services = ServiceRegistry(self.bus, self.async_add_job, self.loop) @@ -190,20 +185,16 @@ class HomeAssistant(object): self.loop._thread_ident = threading.get_ident() _async_create_timer(self) self.bus.async_fire(EVENT_HOMEASSISTANT_START) - if self.pool is not None: - yield from self.loop.run_in_executor( - None, self.pool.block_till_done) self.state = CoreState.running def add_job(self, target: Callable[..., None], *args: Any) -> None: - """Add job to the worker pool. + """Add job to the executor pool. target: target to call. args: parameters for method to call. """ - if self.pool is None: - run_callback_threadsafe(self.pool, self.async_init_pool).result() - self.pool.add_job((target,) + args) + run_callback_threadsafe( + self.loop, self.async_add_job, target, *args).result() @callback def async_add_job(self, target: Callable[..., None], *args: Any) -> None: @@ -214,14 +205,18 @@ class HomeAssistant(object): target: target to call. args: parameters for method to call. """ + task = None + if is_callback(target): self.loop.call_soon(target, *args) elif asyncio.iscoroutinefunction(target): - self.loop.create_task(target(*args)) + task = self.loop.create_task(target(*args)) else: - if self.pool is None: - self.async_init_pool() - self.pool.add_job((target,) + args) + task = self.loop.run_in_executor(None, target, *args) + + # if a task is sheduled + if task is not None: + self._pending_tasks.add(task) @callback def async_run_job(self, target: Callable[..., None], *args: Any) -> None: @@ -249,37 +244,21 @@ class HomeAssistant(object): def block_till_done(self) -> None: """Block till all pending work is done.""" - complete = threading.Event() + run_coroutine_threadsafe( + self.async_block_till_done(), loop=self.loop).result() - @asyncio.coroutine - def sleep_wait(): - """Sleep in thread pool.""" - yield from self.loop.run_in_executor(None, time.sleep, 0) + @asyncio.coroutine + def async_block_till_done(self): + """Block till all pending work is done.""" + while True: + # Wait for the pending tasks are down + if len(self._pending_tasks) > 0: + yield from asyncio.wait(self._pending_tasks, loop=self.loop) - def notify_when_done(): - """Notify event loop when pool done.""" - count = 0 - while True: - # Wait for the work queue to empty - if self.pool is not None: - self.pool.block_till_done() - - # Verify the loop is empty - if self._loop_empty(): - count += 1 - - if count == 2: - break - - # sleep in the loop executor, this forces execution back into - # the event loop to avoid the block thread from starving the - # async loop - run_coroutine_threadsafe(sleep_wait(), self.loop).result() - - complete.set() - - threading.Thread(name="BlockThread", target=notify_when_done).start() - complete.wait() + # Verify the loop is empty + ret = yield from self.loop.run_in_executor(None, self._loop_empty) + if ret: + break def stop(self) -> None: """Stop Home Assistant and shuts down all threads.""" @@ -293,10 +272,7 @@ class HomeAssistant(object): """ self.state = CoreState.stopping self.bus.async_fire(EVENT_HOMEASSISTANT_STOP) - if self.pool is not None: - yield from self.loop.run_in_executor( - None, self.pool.block_till_done) - yield from self.loop.run_in_executor(None, self.pool.stop) + yield from self.async_block_till_done() self.executor.shutdown() if self._websession is not None: yield from self._websession.close() @@ -323,23 +299,17 @@ class HomeAssistant(object): exc_info=exc_info ) - @callback - def async_init_pool(self): - """Initialize the worker pool.""" - self.pool = create_worker_pool() - _async_monitor_worker_pool(self) - @callback def _async_stop_handler(self, *args): """Stop Home Assistant.""" self.exit_code = 0 - self.async_add_job(self.async_stop) + self.loop.create_task(self.async_stop()) @callback def _async_restart_handler(self, *args): """Restart Home Assistant.""" self.exit_code = RESTART_EXIT_CODE - self.async_add_job(self.async_stop) + self.loop.create_task(self.async_stop()) class EventOrigin(enum.Enum): @@ -1196,65 +1166,3 @@ def _async_create_timer(hass, interval=TIMER_INTERVAL): hass.loop.create_task(timer(interval, stop_event)) hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, start_timer) - - -def create_worker_pool(worker_count=None): - """Create a worker pool.""" - if worker_count is None: - worker_count = MIN_WORKER_THREAD - - def job_handler(job): - """Called whenever a job is available to do.""" - try: - func, *args = job - func(*args) - except Exception: # pylint: disable=broad-except - # Catch any exception our service/event_listener might throw - # We do not want to crash our ThreadPool - _LOGGER.exception("BusHandler:Exception doing job") - - return util.ThreadPool(job_handler, worker_count) - - -def _async_monitor_worker_pool(hass): - """Create a monitor for the thread pool to check if pool is misbehaving.""" - busy_threshold = hass.pool.worker_count * 3 - - handle = None - - def schedule(): - """Schedule the monitor.""" - nonlocal handle - handle = hass.loop.call_later(MONITOR_POOL_INTERVAL, - check_pool_threshold) - - def check_pool_threshold(): - """Check pool size.""" - nonlocal busy_threshold - - pending_jobs = hass.pool.queue_size - - if pending_jobs < busy_threshold: - schedule() - return - - _LOGGER.warning( - "WorkerPool:All %d threads are busy and %d jobs pending", - hass.pool.worker_count, pending_jobs) - - for start, job in hass.pool.current_jobs: - _LOGGER.warning("WorkerPool:Current job started at %s: %s", - dt_util.as_local(start).isoformat(), job) - - busy_threshold *= 2 - - schedule() - - schedule() - - @callback - def stop_monitor(event): - """Stop the monitor.""" - handle.cancel() - - hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_monitor) diff --git a/homeassistant/remote.py b/homeassistant/remote.py index ad616de5544..ae96a397826 100644 --- a/homeassistant/remote.py +++ b/homeassistant/remote.py @@ -16,6 +16,7 @@ import logging import time import threading import urllib.parse +import weakref from typing import Optional @@ -127,7 +128,7 @@ class HomeAssistant(ha.HomeAssistant): self.executor = ThreadPoolExecutor(max_workers=5) self.loop.set_default_executor(self.executor) self.loop.set_exception_handler(self._async_exception_handler) - self.pool = ha.create_worker_pool() + self._pending_tasks = weakref.WeakSet() self.bus = EventBus(remote_api, self) self.services = ha.ServiceRegistry(self.bus, self.add_job, self.loop) @@ -176,8 +177,6 @@ class HomeAssistant(ha.HomeAssistant): self.bus.fire(ha.EVENT_HOMEASSISTANT_STOP, origin=ha.EventOrigin.remote) - self.pool.stop() - # Disconnect master event forwarding disconnect_remote_events(self.remote_api, self.config.api) self.state = ha.CoreState.not_running diff --git a/homeassistant/util/__init__.py b/homeassistant/util/__init__.py index fe769f51129..de16a2d23d2 100644 --- a/homeassistant/util/__init__.py +++ b/homeassistant/util/__init__.py @@ -2,7 +2,6 @@ from collections.abc import MutableSet from itertools import chain import threading -import queue from datetime import datetime import re import enum @@ -302,111 +301,3 @@ class Throttle(object): throttle[0].release() return wrapper - - -class ThreadPool(object): - """A priority queue-based thread pool.""" - - def __init__(self, job_handler, worker_count=0): - """Initialize the pool. - - job_handler: method to be called from worker thread to handle job - worker_count: number of threads to run that handle jobs - busy_callback: method to be called when queue gets too big. - Parameters: worker_count, list of current_jobs, - pending_jobs_count - """ - self._job_handler = job_handler - - self.worker_count = 0 - self._work_queue = queue.Queue() - self.current_jobs = [] - self._quit_task = object() - - self.running = True - - for _ in range(worker_count): - self.add_worker() - - @property - def queue_size(self): - """Return estimated number of jobs that are waiting to be processed.""" - return self._work_queue.qsize() - - def add_worker(self): - """Add worker to the thread pool and reset warning limit.""" - if not self.running: - raise RuntimeError("ThreadPool not running") - - threading.Thread( - target=self._worker, daemon=True, - name='ThreadPool Worker {}'.format(self.worker_count)).start() - - self.worker_count += 1 - - def remove_worker(self): - """Remove worker from the thread pool and reset warning limit.""" - if not self.running: - raise RuntimeError("ThreadPool not running") - - self._work_queue.put(self._quit_task) - - self.worker_count -= 1 - - def add_job(self, job): - """Add a job to the queue.""" - if not self.running: - raise RuntimeError("ThreadPool not running") - - self._work_queue.put(job) - - def add_many_jobs(self, jobs): - """Add a list of jobs to the queue.""" - if not self.running: - raise RuntimeError("ThreadPool not running") - - for job in jobs: - self._work_queue.put(job) - - def block_till_done(self): - """Block till current work is done.""" - self._work_queue.join() - - def stop(self): - """Finish all the jobs and stops all the threads.""" - self.block_till_done() - - if not self.running: - return - - # Tell the workers to quit - for _ in range(self.worker_count): - self.remove_worker() - - self.running = False - - # Wait till all workers have quit - self.block_till_done() - - def _worker(self): - """Handle jobs for the thread pool.""" - while True: - # Get new item from work_queue - job = self._work_queue.get() - - if job is self._quit_task: - self._work_queue.task_done() - return - - # Add to current running jobs - job_log = (utcnow(), job) - self.current_jobs.append(job_log) - - # Do the job - self._job_handler(job) - - # Remove from current running job - self.current_jobs.remove(job_log) - - # Tell work_queue the task is done - self._work_queue.task_done() diff --git a/tests/common.py b/tests/common.py index d665e17a503..fd72a6b635b 100644 --- a/tests/common.py +++ b/tests/common.py @@ -40,7 +40,6 @@ def get_test_home_assistant(): loop = asyncio.new_event_loop() hass = loop.run_until_complete(async_test_home_assistant(loop)) - hass.allow_pool = True # FIXME should not be a daemon. Means hass.stop() not called in teardown stop_event = threading.Event() @@ -97,8 +96,6 @@ def async_test_home_assistant(loop): hass.state = ha.CoreState.running - hass.allow_pool = False - # Mock async_start orig_start = hass.async_start @@ -110,20 +107,6 @@ def async_test_home_assistant(loop): hass.async_start = mock_async_start - # Mock async_init_pool - orig_init = hass.async_init_pool - - @ha.callback - def mock_async_init_pool(): - """Prevent worker pool from being initialized.""" - if hass.allow_pool: - with patch('homeassistant.core._async_monitor_worker_pool'): - orig_init() - else: - assert False, 'Thread pool not allowed. Set hass.allow_pool = True' - - hass.async_init_pool = mock_async_init_pool - return hass diff --git a/tests/components/camera/test_generic.py b/tests/components/camera/test_generic.py index fde4bb2fbd4..e2ce9c15936 100644 --- a/tests/components/camera/test_generic.py +++ b/tests/components/camera/test_generic.py @@ -8,7 +8,6 @@ from homeassistant.bootstrap import setup_component @asyncio.coroutine def test_fetching_url(aioclient_mock, hass, test_client): """Test that it fetches the given url.""" - hass.allow_pool = True aioclient_mock.get('http://example.com', text='hello world') def setup_platform(): @@ -40,7 +39,6 @@ def test_fetching_url(aioclient_mock, hass, test_client): @asyncio.coroutine def test_limit_refetch(aioclient_mock, hass, test_client): """Test that it fetches the given url.""" - hass.allow_pool = True aioclient_mock.get('http://example.com/5a', text='hello world') aioclient_mock.get('http://example.com/10a', text='hello world') aioclient_mock.get('http://example.com/15a', text='hello planet') diff --git a/tests/components/camera/test_local_file.py b/tests/components/camera/test_local_file.py index 9a692b0a4ee..d43c138c570 100644 --- a/tests/components/camera/test_local_file.py +++ b/tests/components/camera/test_local_file.py @@ -14,8 +14,6 @@ from tests.common import assert_setup_component, mock_http_component @asyncio.coroutine def test_loading_file(hass, test_client): """Test that it loads image from disk.""" - hass.allow_pool = True - @mock.patch('os.path.isfile', mock.Mock(return_value=True)) @mock.patch('os.access', mock.Mock(return_value=True)) def setup_platform(): diff --git a/tests/test_core.py b/tests/test_core.py index 8a9fb8f6d4a..d3a2d4f353f 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -56,7 +56,7 @@ def test_async_add_job_add_threaded_job_to_pool(mock_iscoro): ha.HomeAssistant.async_add_job(hass, job) assert len(hass.loop.call_soon.mock_calls) == 0 assert len(hass.loop.create_task.mock_calls) == 0 - assert len(hass.pool.add_job.mock_calls) == 1 + assert len(hass.loop.run_in_executor.mock_calls) == 1 def test_async_run_job_calls_callback(): @@ -195,7 +195,6 @@ class TestEventBus(unittest.TestCase): def test_unsubscribe_listener(self): """Test unsubscribe listener from returned function.""" - self.hass.allow_pool = False calls = [] @ha.callback @@ -219,7 +218,6 @@ class TestEventBus(unittest.TestCase): def test_listen_once_event_with_callback(self): """Test listen_once_event method.""" - self.hass.allow_pool = False runs = [] @ha.callback @@ -237,7 +235,6 @@ class TestEventBus(unittest.TestCase): def test_listen_once_event_with_coroutine(self): """Test listen_once_event method.""" - self.hass.allow_pool = False runs = [] @asyncio.coroutine @@ -283,7 +280,6 @@ class TestEventBus(unittest.TestCase): def test_callback_event_listener(self): """Test a event listener listeners.""" - self.hass.allow_pool = False callback_calls = [] @ha.callback @@ -297,7 +293,6 @@ class TestEventBus(unittest.TestCase): def test_coroutine_event_listener(self): """Test a event listener listeners.""" - self.hass.allow_pool = False coroutine_calls = [] @asyncio.coroutine @@ -376,7 +371,6 @@ class TestStateMachine(unittest.TestCase): self.states = self.hass.states self.states.set("light.Bowl", "on") self.states.set("switch.AC", "off") - self.hass.allow_pool = False # pylint: disable=invalid-name def tearDown(self): @@ -523,7 +517,6 @@ class TestServiceRegistry(unittest.TestCase): def test_has_service(self): """Test has_service method.""" - self.hass.allow_pool = False self.assertTrue( self.services.has_service("tesT_domaiN", "tesT_servicE")) self.assertFalse( @@ -533,7 +526,6 @@ class TestServiceRegistry(unittest.TestCase): def test_services(self): """Test services.""" - self.hass.allow_pool = False expected = { 'test_domain': {'test_service': {'description': '', 'fields': {}}} } @@ -556,7 +548,6 @@ class TestServiceRegistry(unittest.TestCase): def test_call_non_existing_with_blocking(self): """Test non-existing with blocking.""" - self.hass.allow_pool = False prior = ha.SERVICE_CALL_LIMIT try: ha.SERVICE_CALL_LIMIT = 0.01 @@ -567,7 +558,6 @@ class TestServiceRegistry(unittest.TestCase): def test_async_service(self): """Test registering and calling an async service.""" - self.hass.allow_pool = False calls = [] @asyncio.coroutine @@ -584,7 +574,6 @@ class TestServiceRegistry(unittest.TestCase): def test_callback_service(self): """Test registering and calling an async service.""" - self.hass.allow_pool = False calls = [] @ha.callback @@ -638,72 +627,6 @@ class TestConfig(unittest.TestCase): self.assertEqual(expected, self.config.as_dict()) -class TestWorkerPool(unittest.TestCase): - """Test WorkerPool methods.""" - - def test_exception_during_job(self): - """Test exception during a job.""" - pool = ha.create_worker_pool(1) - - def malicious_job(_): - raise Exception("Test breaking worker pool") - - calls = [] - - def register_call(_): - calls.append(1) - - pool.add_job((malicious_job, None)) - pool.block_till_done() - pool.add_job((register_call, None)) - pool.block_till_done() - self.assertEqual(1, len(calls)) - - -class TestWorkerPoolMonitor(object): - """Test monitor_worker_pool.""" - - @patch('homeassistant.core._LOGGER.warning') - def test_worker_pool_monitor(self, mock_warning, event_loop): - """Test we log an error and increase threshold.""" - hass = MagicMock() - hass.pool.worker_count = 3 - schedule_handle = MagicMock() - hass.loop.call_later.return_value = schedule_handle - - ha._async_monitor_worker_pool(hass) - assert hass.loop.call_later.called - assert hass.bus.async_listen_once.called - assert not schedule_handle.called - - check_threshold = hass.loop.call_later.mock_calls[0][1][1] - - hass.pool.queue_size = 8 - check_threshold() - assert not mock_warning.called - - hass.pool.queue_size = 9 - check_threshold() - assert mock_warning.called - - mock_warning.reset_mock() - assert not mock_warning.called - - check_threshold() - assert not mock_warning.called - - hass.pool.queue_size = 17 - check_threshold() - assert not mock_warning.called - - hass.pool.queue_size = 18 - check_threshold() - assert mock_warning.called - - hass.bus.async_listen_once.mock_calls[0][1][1](None) - assert schedule_handle.cancel.called - - class TestAsyncCreateTimer(object): """Test create timer."""