diff --git a/homeassistant/core.py b/homeassistant/core.py index 5ec94f69045..c1c26abcd87 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -56,6 +56,9 @@ MIN_WORKER_THREAD = 2 # Pattern for validating entity IDs (format: .) ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$") +# Interval at which we check if the pool is getting busy +MONITOR_POOL_INTERVAL = 30 + _LOGGER = logging.getLogger(__name__) @@ -190,7 +193,8 @@ class HomeAssistant(object): This method is a coroutine. """ - create_timer(self) + async_create_timer(self) + async_monitor_worker_pool(self) self.bus.async_fire(EVENT_HOMEASSISTANT_START) yield from self.loop.run_in_executor(None, self.pool.block_till_done) self.state = CoreState.running @@ -1075,13 +1079,8 @@ class Config(object): } -def create_timer(hass, interval=TIMER_INTERVAL): +def async_create_timer(hass, interval=TIMER_INTERVAL): """Create a timer that will start on HOMEASSISTANT_START.""" - # We want to be able to fire every time a minute starts (seconds=0). - # We want this so other modules can use that to make sure they fire - # every minute. - assert 60 % interval == 0, "60 % TIMER_INTERVAL should be 0!" - stop_event = asyncio.Event(loop=hass.loop) # Setting the Event inside the loop by marking it as a coroutine @@ -1160,14 +1159,48 @@ def create_worker_pool(worker_count=None): # We do not want to crash our ThreadPool _LOGGER.exception("BusHandler:Exception doing job") - def busy_callback(worker_count, current_jobs, pending_jobs_count): - """Callback to be called when the pool queue gets too big.""" + return util.ThreadPool(job_handler, worker_count) + + +def async_monitor_worker_pool(hass): + """Create a monitor for the thread pool to check if pool is misbehaving.""" + busy_threshold = hass.pool.worker_count * 3 + + handle = None + + def schedule(): + """Schedule the monitor.""" + nonlocal handle + handle = hass.loop.call_later(MONITOR_POOL_INTERVAL, + check_pool_threshold) + + def check_pool_threshold(): + """Check pool size.""" + nonlocal busy_threshold + + pending_jobs = hass.pool.queue_size + + if pending_jobs < busy_threshold: + schedule() + return + _LOGGER.warning( "WorkerPool:All %d threads are busy and %d jobs pending", - worker_count, pending_jobs_count) + hass.pool.worker_count, pending_jobs) - for start, job in current_jobs: - _LOGGER.warning("WorkerPool:Current job from %s: %s", + for start, job in hass.pool.current_jobs: + _LOGGER.warning("WorkerPool:Current job started at %s: %s", dt_util.as_local(start).isoformat(), job) - return util.ThreadPool(job_handler, worker_count, busy_callback) + busy_threshold *= 2 + + schedule() + + schedule() + + @asyncio.coroutine + def stop_monitor(event): + """Stop the monitor.""" + handle.cancel() + + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, stop_monitor) diff --git a/homeassistant/remote.py b/homeassistant/remote.py index 2edc368424f..8725990f146 100644 --- a/homeassistant/remote.py +++ b/homeassistant/remote.py @@ -143,7 +143,7 @@ class HomeAssistant(ha.HomeAssistant): 'Unable to setup local API to receive events') self.state = ha.CoreState.starting - ha.create_timer(self) + ha.async_create_timer(self) self.bus.fire(ha.EVENT_HOMEASSISTANT_START, origin=ha.EventOrigin.remote) diff --git a/homeassistant/util/__init__.py b/homeassistant/util/__init__.py index a9df428513a..c331dabcab2 100644 --- a/homeassistant/util/__init__.py +++ b/homeassistant/util/__init__.py @@ -308,7 +308,7 @@ class ThreadPool(object): """A priority queue-based thread pool.""" # pylint: disable=too-many-instance-attributes - def __init__(self, job_handler, worker_count=0, busy_callback=None): + def __init__(self, job_handler, worker_count=0): """Initialize the pool. job_handler: method to be called from worker thread to handle job @@ -318,13 +318,10 @@ class ThreadPool(object): pending_jobs_count """ self._job_handler = job_handler - self._busy_callback = busy_callback self.worker_count = 0 - self.busy_warning_limit = 0 self._work_queue = queue.PriorityQueue() self.current_jobs = [] - self._lock = threading.RLock() self._quit_task = object() self.running = True @@ -332,70 +329,45 @@ class ThreadPool(object): for _ in range(worker_count): self.add_worker() + @property + def queue_size(self): + """Return estimated number of jobs that are waiting to be processed.""" + return self._work_queue.qsize() + def add_worker(self): """Add worker to the thread pool and reset warning limit.""" - with self._lock: - if not self.running: - raise RuntimeError("ThreadPool not running") + if not self.running: + raise RuntimeError("ThreadPool not running") - worker = threading.Thread( - target=self._worker, - name='ThreadPool Worker {}'.format(self.worker_count)) - worker.daemon = True - worker.start() + threading.Thread( + target=self._worker, daemon=True, + name='ThreadPool Worker {}'.format(self.worker_count)).start() - self.worker_count += 1 - self.busy_warning_limit = self.worker_count * 3 + self.worker_count += 1 def remove_worker(self): """Remove worker from the thread pool and reset warning limit.""" - with self._lock: - if not self.running: - raise RuntimeError("ThreadPool not running") + if not self.running: + raise RuntimeError("ThreadPool not running") - self._work_queue.put(PriorityQueueItem(0, self._quit_task)) + self._work_queue.put(PriorityQueueItem(0, self._quit_task)) - self.worker_count -= 1 - self.busy_warning_limit = self.worker_count * 3 + self.worker_count -= 1 def add_job(self, priority, job): """Add a job to the queue.""" - with self._lock: - if not self.running: - raise RuntimeError("ThreadPool not running") + if not self.running: + raise RuntimeError("ThreadPool not running") - self._work_queue.put(PriorityQueueItem(priority, job)) - - # Check if our queue is getting too big. - if self._work_queue.qsize() > self.busy_warning_limit \ - and self._busy_callback is not None: - - # Increase limit we will issue next warning. - self.busy_warning_limit *= 2 - - self._busy_callback( - self.worker_count, self.current_jobs, - self._work_queue.qsize()) + self._work_queue.put(PriorityQueueItem(priority, job)) def add_many_jobs(self, jobs): """Add a list of jobs to the queue.""" - with self._lock: - if not self.running: - raise RuntimeError("ThreadPool not running") + if not self.running: + raise RuntimeError("ThreadPool not running") - for priority, job in jobs: - self._work_queue.put(PriorityQueueItem(priority, job)) - - # Check if our queue is getting too big. - if self._work_queue.qsize() > self.busy_warning_limit \ - and self._busy_callback is not None: - - # Increase limit we will issue next warning. - self.busy_warning_limit *= 2 - - self._busy_callback( - self.worker_count, self.current_jobs, - self._work_queue.qsize()) + for priority, job in jobs: + self._work_queue.put(PriorityQueueItem(priority, job)) def block_till_done(self): """Block till current work is done.""" @@ -405,18 +377,17 @@ class ThreadPool(object): """Finish all the jobs and stops all the threads.""" self.block_till_done() - with self._lock: - if not self.running: - return + if not self.running: + return - # Tell the workers to quit - for _ in range(self.worker_count): - self.remove_worker() + # Tell the workers to quit + for _ in range(self.worker_count): + self.remove_worker() - self.running = False + self.running = False - # Wait till all workers have quit - self.block_till_done() + # Wait till all workers have quit + self.block_till_done() def _worker(self): """Handle jobs for the thread pool.""" diff --git a/requirements_test.txt b/requirements_test.txt index a996ef411c3..fd782a66933 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -2,6 +2,7 @@ flake8>=3.0.4 pylint>=1.5.6 coveralls>=1.1 pytest>=2.9.2 +pytest-asyncio>=0.5.0 pytest-cov>=2.3.1 pytest-timeout>=1.0.0 pytest-catchlog>=1.2.2 diff --git a/tests/common.py b/tests/common.py index ffc5d13b2be..ceb9bf3c058 100644 --- a/tests/common.py +++ b/tests/common.py @@ -75,9 +75,11 @@ def get_test_home_assistant(num_threads=None): """Helper to start hass.""" with patch.object(hass.loop, 'run_forever', return_value=None): with patch.object(hass, 'async_stop', return_value=fake_stop()): - with patch.object(ha, 'create_timer', return_value=None): - orig_start() - hass.block_till_done() + with patch.object(ha, 'async_create_timer', return_value=None): + with patch.object(ha, 'async_monitor_worker_pool', + return_value=None): + orig_start() + hass.block_till_done() def stop_hass(): orig_stop() diff --git a/tests/test_core.py b/tests/test_core.py index 118e9909815..9b57f07e9e6 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,7 +4,7 @@ import os import signal import unittest -from unittest.mock import patch +from unittest.mock import patch, MagicMock from datetime import datetime, timedelta import pytz @@ -459,3 +459,84 @@ class TestWorkerPool(unittest.TestCase): pool.add_job(ha.JobPriority.EVENT_DEFAULT, (register_call, None)) pool.block_till_done() self.assertEqual(1, len(calls)) + + +class TestWorkerPoolMonitor(object): + """Test monitor_worker_pool.""" + + @patch('homeassistant.core._LOGGER.warning') + def test_worker_pool_monitor(self, mock_warning, event_loop): + """Test we log an error and increase threshold.""" + hass = MagicMock() + hass.pool.worker_count = 3 + schedule_handle = MagicMock() + hass.loop.call_later.return_value = schedule_handle + + ha.async_monitor_worker_pool(hass) + assert hass.loop.call_later.called + assert hass.bus.async_listen_once.called + assert not schedule_handle.called + + check_threshold = hass.loop.call_later.mock_calls[0][1][1] + + hass.pool.queue_size = 8 + check_threshold() + assert not mock_warning.called + + hass.pool.queue_size = 9 + check_threshold() + assert mock_warning.called + + mock_warning.reset_mock() + assert not mock_warning.called + + check_threshold() + assert not mock_warning.called + + hass.pool.queue_size = 17 + check_threshold() + assert not mock_warning.called + + hass.pool.queue_size = 18 + check_threshold() + assert mock_warning.called + + event_loop.run_until_complete( + hass.bus.async_listen_once.mock_calls[0][1][1](None)) + assert schedule_handle.cancel.called + + +class TestAsyncCreateTimer(object): + """Test create timer.""" + + @patch('homeassistant.core.asyncio.Event') + @patch('homeassistant.core.dt_util.utcnow') + def test_create_timer(self, mock_utcnow, mock_event, event_loop): + """Test create timer fires correctly.""" + hass = MagicMock() + now = mock_utcnow() + event = mock_event() + now.second = 1 + mock_utcnow.reset_mock() + + ha.async_create_timer(hass) + assert len(hass.bus.async_listen_once.mock_calls) == 2 + start_timer = hass.bus.async_listen_once.mock_calls[1][1][1] + + event_loop.run_until_complete(start_timer(None)) + assert hass.loop.create_task.called + + timer = hass.loop.create_task.mock_calls[0][1][0] + event.is_set.side_effect = False, False, True + event_loop.run_until_complete(timer) + assert len(mock_utcnow.mock_calls) == 1 + + assert hass.loop.call_soon.called + event_type, event_data = hass.loop.call_soon.mock_calls[0][1][1:] + + assert ha.EVENT_TIME_CHANGED == event_type + assert {ha.ATTR_NOW: now} == event_data + + stop_timer = hass.bus.async_listen_once.mock_calls[0][1][1] + event_loop.run_until_complete(stop_timer(None)) + assert event.set.called diff --git a/tests/test_remote.py b/tests/test_remote.py index 47aee687f66..a5212face2c 100644 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -69,7 +69,7 @@ def setUpModule(): # pylint: disable=invalid-name {http.DOMAIN: {http.CONF_API_PASSWORD: API_PASSWORD, http.CONF_SERVER_PORT: SLAVE_PORT}}) - with patch.object(ha, 'create_timer', return_value=None): + with patch.object(ha, 'async_create_timer', return_value=None): slave.start()