From ac634d71f4cc75946012e51f955c6f5292da2aee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ville=20Skytt=C3=A4?= Date: Sat, 28 Sep 2019 03:02:48 +0300 Subject: [PATCH] Remove no longer needed Python < 3.6 compatibility code (#27024) --- homeassistant/util/async_.py | 130 +---------------------------------- 1 file changed, 3 insertions(+), 127 deletions(-) diff --git a/homeassistant/util/async_.py b/homeassistant/util/async_.py index d43658d1584..6920e0d97f6 100644 --- a/homeassistant/util/async_.py +++ b/homeassistant/util/async_.py @@ -1,23 +1,13 @@ -"""Asyncio backports for Python 3.4.3 compatibility.""" +"""Asyncio backports for Python 3.6 compatibility.""" import concurrent.futures import threading import logging from asyncio import coroutines from asyncio.events import AbstractEventLoop -from asyncio.futures import Future import asyncio from asyncio import ensure_future -from typing import ( - Any, - Union, - Coroutine, - Callable, - Generator, - TypeVar, - Awaitable, - Optional, -) +from typing import Any, Union, Coroutine, Callable, Generator, TypeVar, Awaitable _LOGGER = logging.getLogger(__name__) @@ -40,105 +30,6 @@ except AttributeError: loop.close() -def _set_result_unless_cancelled(fut: Future, result: Any) -> None: - """Set the result only if the Future was not cancelled.""" - if fut.cancelled(): - return - fut.set_result(result) - - -def _set_concurrent_future_state( - concurr: concurrent.futures.Future, source: Union[concurrent.futures.Future, Future] -) -> None: - """Copy state from a future to a concurrent.futures.Future.""" - assert source.done() - if source.cancelled(): - concurr.cancel() - if not concurr.set_running_or_notify_cancel(): - return - exception = source.exception() - if exception is not None: - concurr.set_exception(exception) - else: - result = source.result() - concurr.set_result(result) - - -def _copy_future_state( - source: Union[concurrent.futures.Future, Future], - dest: Union[concurrent.futures.Future, Future], -) -> None: - """Copy state from another Future. - - The other Future may be a concurrent.futures.Future. - """ - assert source.done() - if dest.cancelled(): - return - assert not dest.done() - if source.cancelled(): - dest.cancel() - else: - exception = source.exception() - if exception is not None: - dest.set_exception(exception) - else: - result = source.result() - dest.set_result(result) - - -def _chain_future( - source: Union[concurrent.futures.Future, Future], - destination: Union[concurrent.futures.Future, Future], -) -> None: - """Chain two futures so that when one completes, so does the other. - - The result (or exception) of source will be copied to destination. - If destination is cancelled, source gets cancelled too. - Compatible with both asyncio.Future and concurrent.futures.Future. - """ - if not isinstance(source, (Future, concurrent.futures.Future)): - raise TypeError("A future is required for source argument") - if not isinstance(destination, (Future, concurrent.futures.Future)): - raise TypeError("A future is required for destination argument") - # pylint: disable=protected-access - if isinstance(source, Future): - source_loop: Optional[AbstractEventLoop] = source._loop - else: - source_loop = None - if isinstance(destination, Future): - dest_loop: Optional[AbstractEventLoop] = destination._loop - else: - dest_loop = None - - def _set_state( - future: Union[concurrent.futures.Future, Future], - other: Union[concurrent.futures.Future, Future], - ) -> None: - if isinstance(future, Future): - _copy_future_state(other, future) - else: - _set_concurrent_future_state(future, other) - - def _call_check_cancel( - destination: Union[concurrent.futures.Future, Future] - ) -> None: - if destination.cancelled(): - if source_loop is None or source_loop is dest_loop: - source.cancel() - else: - source_loop.call_soon_threadsafe(source.cancel) - - def _call_set_state(source: Union[concurrent.futures.Future, Future]) -> None: - if dest_loop is None or dest_loop is source_loop: - _set_state(destination, source) - else: - dest_loop.call_soon_threadsafe(_set_state, destination, source) - - destination.add_done_callback(_call_check_cancel) - source.add_done_callback(_call_set_state) - - def run_coroutine_threadsafe( coro: Union[Coroutine, Generator], loop: AbstractEventLoop ) -> concurrent.futures.Future: @@ -150,22 +41,7 @@ def run_coroutine_threadsafe( if ident is not None and ident == threading.get_ident(): raise RuntimeError("Cannot be called from within the event loop") - if not coroutines.iscoroutine(coro): - raise TypeError("A coroutine object is required") - future: concurrent.futures.Future = concurrent.futures.Future() - - def callback() -> None: - """Handle the call to the coroutine.""" - try: - _chain_future(ensure_future(coro, loop=loop), future) - except Exception as exc: # pylint: disable=broad-except - if future.set_running_or_notify_cancel(): - future.set_exception(exc) - else: - _LOGGER.warning("Exception on lost future: ", exc_info=True) - - loop.call_soon_threadsafe(callback) - return future + return asyncio.run_coroutine_threadsafe(coro, loop) def fire_coroutine_threadsafe(coro: Coroutine, loop: AbstractEventLoop) -> None: