diff --git a/homeassistant/core.py b/homeassistant/core.py index 47a21f3325b..0f038149d63 100644 --- a/homeassistant/core.py +++ b/homeassistant/core.py @@ -95,6 +95,7 @@ from .util.async_ import ( run_callback_threadsafe, shutdown_run_callback_threadsafe, ) +from .util.executor import InterruptibleThreadPoolExecutor from .util.json import JsonObjectType from .util.read_only_dict import ReadOnlyDict from .util.timeout import TimeoutManager @@ -394,6 +395,9 @@ class HomeAssistant: self.timeout: TimeoutManager = TimeoutManager() self._stop_future: concurrent.futures.Future[None] | None = None self._shutdown_jobs: list[HassJobWithArgs] = [] + self.import_executor = InterruptibleThreadPoolExecutor( + max_workers=1, thread_name_prefix="ImportExecutor" + ) @cached_property def is_running(self) -> bool: @@ -678,6 +682,16 @@ class HomeAssistant: return task + @callback + def async_add_import_executor_job( + self, target: Callable[..., _T], *args: Any + ) -> asyncio.Future[_T]: + """Add an import executor job from within the event loop.""" + task = self.loop.run_in_executor(self.import_executor, target, *args) + self._tasks.add(task) + task.add_done_callback(self._tasks.remove) + return task + @overload @callback def async_run_hass_job( @@ -992,6 +1006,7 @@ class HomeAssistant: self._async_log_running_tasks("close") self.set_state(CoreState.stopped) + self.import_executor.shutdown() if self._stopped is not None: self._stopped.set() diff --git a/homeassistant/loader.py b/homeassistant/loader.py index 1ff98ff6ff2..6c736bf8c4d 100644 --- a/homeassistant/loader.py +++ b/homeassistant/loader.py @@ -853,7 +853,7 @@ class Integration: # So we do it before validating config to catch these errors. if load_executor: try: - comp = await self.hass.async_add_executor_job(self.get_component) + comp = await self.hass.async_add_import_executor_job(self.get_component) except ImportError as ex: load_executor = False _LOGGER.debug("Failed to import %s in executor", domain, exc_info=ex) @@ -924,7 +924,7 @@ class Integration: try: if load_executor: try: - platform = await self.hass.async_add_executor_job( + platform = await self.hass.async_add_import_executor_job( self._load_platform, platform_name ) except ImportError as ex: diff --git a/tests/components/zwave_js/test_update.py b/tests/components/zwave_js/test_update.py index ed42363ca41..1774254a3c5 100644 --- a/tests/components/zwave_js/test_update.py +++ b/tests/components/zwave_js/test_update.py @@ -310,7 +310,7 @@ async def test_update_entity_ha_not_running( hass_ws_client: WebSocketGenerator, ) -> None: """Test update occurs only after HA is running.""" - await hass.async_stop() + hass.set_state(CoreState.not_running) client.async_send_command.return_value = {"updates": []} @@ -632,7 +632,7 @@ async def test_update_entity_delay( """Test update occurs on a delay after HA starts.""" client.async_send_command.reset_mock() client.async_send_command.return_value = {"updates": []} - await hass.async_stop() + hass.set_state(CoreState.not_running) entry = MockConfigEntry(domain="zwave_js", data={"url": "ws://test.org"}) entry.add_to_hass(hass) diff --git a/tests/test_core.py b/tests/test_core.py index 3a2c34c5e1c..987f228bea8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2869,3 +2869,19 @@ def test_one_time_listener_repr(hass: HomeAssistant) -> None: assert "OneTimeListener" in repr_str assert "test_core" in repr_str assert "_listener" in repr_str + + +async def test_async_add_import_executor_job(hass: HomeAssistant) -> None: + """Test async_add_import_executor_job works and is limited to one thread.""" + evt = threading.Event() + loop = asyncio.get_running_loop() + + def executor_func() -> None: + evt.set() + return evt + + future = hass.async_add_import_executor_job(executor_func) + await loop.run_in_executor(None, evt.wait) + assert await future is evt + + assert hass.import_executor._max_workers == 1