diff --git a/homeassistant/core.py b/homeassistant/core.py index eea48d536e3..c1be6e760d5 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -58,6 +58,9 @@ ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$") # Size of a executor pool EXECUTOR_POOL_SIZE = 15 +# Time for cleanup internal pending tasks +TIME_INTERVAL_TASKS_CLEANUP = 10 + _LOGGER = logging.getLogger(__name__) @@ -110,6 +113,7 @@ class HomeAssistant(object): self.loop.set_default_executor(self.executor) self.loop.set_exception_handler(self._async_exception_handler) self._pending_tasks = [] + self._pending_sheduler = None self.bus = EventBus(self) self.services = ServiceRegistry(self.bus, self.async_add_job, self.loop) @@ -182,10 +186,24 @@ class HomeAssistant(object): # pylint: disable=protected-access self.loop._thread_ident = threading.get_ident() + self._async_tasks_cleanup() _async_create_timer(self) self.bus.async_fire(EVENT_HOMEASSISTANT_START) self.state = CoreState.running + @callback + def _async_tasks_cleanup(self): + """Cleanup all pending tasks in a time interval. + + This method must be run in the event loop. + """ + self._pending_tasks = [task for task in self._pending_tasks + if not task.done()] + + # sheduled next cleanup + self._pending_sheduler = self.loop.call_later( + TIME_INTERVAL_TASKS_CLEANUP, self._async_tasks_cleanup) + def add_job(self, target: Callable[..., None], *args: Any) -> None: """Add job to the executor pool. @@ -219,11 +237,6 @@ class HomeAssistant(object): if task is not None: self._pending_tasks.append(task) - # cleanup - if len(self._pending_tasks) > 50: - self._pending_tasks = [sheduled for sheduled in self._pending_tasks - if not sheduled.done()] - @callback def async_run_job(self, target: Callable[..., None], *args: Any) -> None: """Run a job from within the event loop. @@ -281,6 +294,8 @@ class HomeAssistant(object): """ self.state = CoreState.stopping self.bus.async_fire(EVENT_HOMEASSISTANT_STOP) + if self._pending_sheduler is not None: + self._pending_sheduler.cancel() yield from self.async_block_till_done() self.executor.shutdown() if self._websession is not None: diff --git a/homeassistant/remote.py b/homeassistant/remote.py index ba69d3249a9..c4293680ec5 100644 --- a/homeassistant/remote.py +++ b/homeassistant/remote.py @@ -128,6 +128,7 @@ class HomeAssistant(ha.HomeAssistant): self.loop.set_default_executor(self.executor) self.loop.set_exception_handler(self._async_exception_handler) self._pending_tasks = [] + self._pending_sheduler = None self.bus = EventBus(remote_api, self) self.services = ha.ServiceRegistry(self.bus, self.add_job, self.loop) diff --git a/tests/common.py b/tests/common.py index 6f017d29b46..244ad0b6723 100644 --- a/tests/common.py +++ b/tests/common.py @@ -102,7 +102,8 @@ def async_test_home_assistant(loop): @asyncio.coroutine def mock_async_start(): with patch.object(loop, 'add_signal_handler'), \ - patch('homeassistant.core._async_create_timer'): + patch('homeassistant.core._async_create_timer'), \ + patch.object(hass, '_async_tasks_cleanup', return_value=None): yield from orig_start() hass.async_start = mock_async_start diff --git a/tests/test_core.py b/tests/test_core.py index 71830a20719..60057e57ad1 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -9,6 +9,8 @@ import pytz import homeassistant.core as ha from homeassistant.exceptions import InvalidEntityFormatError +from homeassistant.util.async import ( + run_callback_threadsafe, run_coroutine_threadsafe) import homeassistant.util.dt as dt_util from homeassistant.util.unit_system import (METRIC_SYSTEM) from homeassistant.const import ( @@ -118,7 +120,7 @@ class TestHomeAssistant(unittest.TestCase): # self.assertEqual(1, len(calls)) - def test_async_add_job_pending_tasks_add(self): + def test_pending_sheduler(self): """Add a coro to pending tasks.""" call_count = [] @@ -127,14 +129,24 @@ class TestHomeAssistant(unittest.TestCase): """Test Coro.""" call_count.append('call') - self.hass.add_job(test_coro()) + for i in range(50): + self.hass.add_job(test_coro()) - assert len(self.hass._pending_tasks) == 1 - self.hass.block_till_done() - assert len(call_count) == 1 + run_coroutine_threadsafe( + asyncio.wait(self.hass._pending_tasks, loop=self.hass.loop), + loop=self.hass.loop + ).result() - def test_async_add_job_pending_tasks_cleanup(self): - """Add a coro to pending tasks and test cleanup.""" + with patch.object(self.hass.loop, 'call_later') as mock_later: + run_callback_threadsafe( + self.hass.loop, self.hass._async_tasks_cleanup).result() + assert mock_later.called + + assert len(self.hass._pending_tasks) == 0 + assert len(call_count) == 50 + + def test_async_add_job_pending_tasks_coro(self): + """Add a coro to pending tasks.""" call_count = [] @asyncio.coroutine @@ -149,12 +161,6 @@ class TestHomeAssistant(unittest.TestCase): self.hass.block_till_done() assert len(call_count) == 50 - self.hass.add_job(test_coro()) - - assert len(self.hass._pending_tasks) == 1 - self.hass.block_till_done() - assert len(call_count) == 51 - def test_async_add_job_pending_tasks_executor(self): """Run a executor in pending tasks.""" call_count = []