mirror of
https://github.com/home-assistant/core.git
synced 2025-07-16 17:57:11 +00:00
Change pending task sheduler to time based cleanup (#4324)
* Change pending task sheduler to time based cleanup * update unittest
This commit is contained in:
parent
04dbc992ec
commit
71da21dcc8
@ -58,6 +58,9 @@ ENTITY_ID_PATTERN = re.compile(r"^(\w+)\.(\w+)$")
|
|||||||
# Size of a executor pool
|
# Size of a executor pool
|
||||||
EXECUTOR_POOL_SIZE = 15
|
EXECUTOR_POOL_SIZE = 15
|
||||||
|
|
||||||
|
# Time for cleanup internal pending tasks
|
||||||
|
TIME_INTERVAL_TASKS_CLEANUP = 10
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -110,6 +113,7 @@ class HomeAssistant(object):
|
|||||||
self.loop.set_default_executor(self.executor)
|
self.loop.set_default_executor(self.executor)
|
||||||
self.loop.set_exception_handler(self._async_exception_handler)
|
self.loop.set_exception_handler(self._async_exception_handler)
|
||||||
self._pending_tasks = []
|
self._pending_tasks = []
|
||||||
|
self._pending_sheduler = None
|
||||||
self.bus = EventBus(self)
|
self.bus = EventBus(self)
|
||||||
self.services = ServiceRegistry(self.bus, self.async_add_job,
|
self.services = ServiceRegistry(self.bus, self.async_add_job,
|
||||||
self.loop)
|
self.loop)
|
||||||
@ -182,10 +186,24 @@ class HomeAssistant(object):
|
|||||||
|
|
||||||
# pylint: disable=protected-access
|
# pylint: disable=protected-access
|
||||||
self.loop._thread_ident = threading.get_ident()
|
self.loop._thread_ident = threading.get_ident()
|
||||||
|
self._async_tasks_cleanup()
|
||||||
_async_create_timer(self)
|
_async_create_timer(self)
|
||||||
self.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
self.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
||||||
self.state = CoreState.running
|
self.state = CoreState.running
|
||||||
|
|
||||||
|
@callback
|
||||||
|
def _async_tasks_cleanup(self):
|
||||||
|
"""Cleanup all pending tasks in a time interval.
|
||||||
|
|
||||||
|
This method must be run in the event loop.
|
||||||
|
"""
|
||||||
|
self._pending_tasks = [task for task in self._pending_tasks
|
||||||
|
if not task.done()]
|
||||||
|
|
||||||
|
# sheduled next cleanup
|
||||||
|
self._pending_sheduler = self.loop.call_later(
|
||||||
|
TIME_INTERVAL_TASKS_CLEANUP, self._async_tasks_cleanup)
|
||||||
|
|
||||||
def add_job(self, target: Callable[..., None], *args: Any) -> None:
|
def add_job(self, target: Callable[..., None], *args: Any) -> None:
|
||||||
"""Add job to the executor pool.
|
"""Add job to the executor pool.
|
||||||
|
|
||||||
@ -219,11 +237,6 @@ class HomeAssistant(object):
|
|||||||
if task is not None:
|
if task is not None:
|
||||||
self._pending_tasks.append(task)
|
self._pending_tasks.append(task)
|
||||||
|
|
||||||
# cleanup
|
|
||||||
if len(self._pending_tasks) > 50:
|
|
||||||
self._pending_tasks = [sheduled for sheduled in self._pending_tasks
|
|
||||||
if not sheduled.done()]
|
|
||||||
|
|
||||||
@callback
|
@callback
|
||||||
def async_run_job(self, target: Callable[..., None], *args: Any) -> None:
|
def async_run_job(self, target: Callable[..., None], *args: Any) -> None:
|
||||||
"""Run a job from within the event loop.
|
"""Run a job from within the event loop.
|
||||||
@ -281,6 +294,8 @@ class HomeAssistant(object):
|
|||||||
"""
|
"""
|
||||||
self.state = CoreState.stopping
|
self.state = CoreState.stopping
|
||||||
self.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
self.bus.async_fire(EVENT_HOMEASSISTANT_STOP)
|
||||||
|
if self._pending_sheduler is not None:
|
||||||
|
self._pending_sheduler.cancel()
|
||||||
yield from self.async_block_till_done()
|
yield from self.async_block_till_done()
|
||||||
self.executor.shutdown()
|
self.executor.shutdown()
|
||||||
if self._websession is not None:
|
if self._websession is not None:
|
||||||
|
@ -128,6 +128,7 @@ class HomeAssistant(ha.HomeAssistant):
|
|||||||
self.loop.set_default_executor(self.executor)
|
self.loop.set_default_executor(self.executor)
|
||||||
self.loop.set_exception_handler(self._async_exception_handler)
|
self.loop.set_exception_handler(self._async_exception_handler)
|
||||||
self._pending_tasks = []
|
self._pending_tasks = []
|
||||||
|
self._pending_sheduler = None
|
||||||
|
|
||||||
self.bus = EventBus(remote_api, self)
|
self.bus = EventBus(remote_api, self)
|
||||||
self.services = ha.ServiceRegistry(self.bus, self.add_job, self.loop)
|
self.services = ha.ServiceRegistry(self.bus, self.add_job, self.loop)
|
||||||
|
@ -102,7 +102,8 @@ def async_test_home_assistant(loop):
|
|||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def mock_async_start():
|
def mock_async_start():
|
||||||
with patch.object(loop, 'add_signal_handler'), \
|
with patch.object(loop, 'add_signal_handler'), \
|
||||||
patch('homeassistant.core._async_create_timer'):
|
patch('homeassistant.core._async_create_timer'), \
|
||||||
|
patch.object(hass, '_async_tasks_cleanup', return_value=None):
|
||||||
yield from orig_start()
|
yield from orig_start()
|
||||||
|
|
||||||
hass.async_start = mock_async_start
|
hass.async_start = mock_async_start
|
||||||
|
@ -9,6 +9,8 @@ import pytz
|
|||||||
|
|
||||||
import homeassistant.core as ha
|
import homeassistant.core as ha
|
||||||
from homeassistant.exceptions import InvalidEntityFormatError
|
from homeassistant.exceptions import InvalidEntityFormatError
|
||||||
|
from homeassistant.util.async import (
|
||||||
|
run_callback_threadsafe, run_coroutine_threadsafe)
|
||||||
import homeassistant.util.dt as dt_util
|
import homeassistant.util.dt as dt_util
|
||||||
from homeassistant.util.unit_system import (METRIC_SYSTEM)
|
from homeassistant.util.unit_system import (METRIC_SYSTEM)
|
||||||
from homeassistant.const import (
|
from homeassistant.const import (
|
||||||
@ -118,7 +120,7 @@ class TestHomeAssistant(unittest.TestCase):
|
|||||||
|
|
||||||
# self.assertEqual(1, len(calls))
|
# self.assertEqual(1, len(calls))
|
||||||
|
|
||||||
def test_async_add_job_pending_tasks_add(self):
|
def test_pending_sheduler(self):
|
||||||
"""Add a coro to pending tasks."""
|
"""Add a coro to pending tasks."""
|
||||||
call_count = []
|
call_count = []
|
||||||
|
|
||||||
@ -127,14 +129,24 @@ class TestHomeAssistant(unittest.TestCase):
|
|||||||
"""Test Coro."""
|
"""Test Coro."""
|
||||||
call_count.append('call')
|
call_count.append('call')
|
||||||
|
|
||||||
self.hass.add_job(test_coro())
|
for i in range(50):
|
||||||
|
self.hass.add_job(test_coro())
|
||||||
|
|
||||||
assert len(self.hass._pending_tasks) == 1
|
run_coroutine_threadsafe(
|
||||||
self.hass.block_till_done()
|
asyncio.wait(self.hass._pending_tasks, loop=self.hass.loop),
|
||||||
assert len(call_count) == 1
|
loop=self.hass.loop
|
||||||
|
).result()
|
||||||
|
|
||||||
def test_async_add_job_pending_tasks_cleanup(self):
|
with patch.object(self.hass.loop, 'call_later') as mock_later:
|
||||||
"""Add a coro to pending tasks and test cleanup."""
|
run_callback_threadsafe(
|
||||||
|
self.hass.loop, self.hass._async_tasks_cleanup).result()
|
||||||
|
assert mock_later.called
|
||||||
|
|
||||||
|
assert len(self.hass._pending_tasks) == 0
|
||||||
|
assert len(call_count) == 50
|
||||||
|
|
||||||
|
def test_async_add_job_pending_tasks_coro(self):
|
||||||
|
"""Add a coro to pending tasks."""
|
||||||
call_count = []
|
call_count = []
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
@ -149,12 +161,6 @@ class TestHomeAssistant(unittest.TestCase):
|
|||||||
self.hass.block_till_done()
|
self.hass.block_till_done()
|
||||||
assert len(call_count) == 50
|
assert len(call_count) == 50
|
||||||
|
|
||||||
self.hass.add_job(test_coro())
|
|
||||||
|
|
||||||
assert len(self.hass._pending_tasks) == 1
|
|
||||||
self.hass.block_till_done()
|
|
||||||
assert len(call_count) == 51
|
|
||||||
|
|
||||||
def test_async_add_job_pending_tasks_executor(self):
|
def test_async_add_job_pending_tasks_executor(self):
|
||||||
"""Run a executor in pending tasks."""
|
"""Run a executor in pending tasks."""
|
||||||
call_count = []
|
call_count = []
|
||||||
|
Loading…
x
Reference in New Issue
Block a user