Cleanup coroutine threadsafe (#27080)

* Cleanup coroutine threadsafe

* fix lint

* Fix typing

* Fix tests

* Fix black
This commit is contained in:
Pascal Vizeli 2019-10-01 16:59:06 +02:00 committed by GitHub
parent f4a1f2809b
commit c1851a2d94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 196 additions and 232 deletions

View File

@ -1,4 +1,5 @@
"""Tracking for bluetooth low energy devices."""
import asyncio
import logging
from homeassistant.helpers.event import track_point_in_utc_time
@ -14,7 +15,6 @@ from homeassistant.components.device_tracker.const import (
)
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
import homeassistant.util.dt as dt_util
from homeassistant.util.async_ import run_coroutine_threadsafe
_LOGGER = logging.getLogger(__name__)
@ -89,7 +89,7 @@ def setup_scanner(hass, config, see, discovery_info=None):
# Load all known devices.
# We just need the devices so set consider_home and home range
# to 0
for device in run_coroutine_threadsafe(
for device in asyncio.run_coroutine_threadsafe(
async_load_config(yaml_path, hass, 0), hass.loop
).result():
# check if device is a valid bluetooth device

View File

@ -26,7 +26,6 @@ from homeassistant.components.camera import (
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers import config_validation as cv
from homeassistant.util.async_ import run_coroutine_threadsafe
_LOGGER = logging.getLogger(__name__)
@ -105,7 +104,7 @@ class GenericCamera(Camera):
def camera_image(self):
"""Return bytes of camera image."""
return run_coroutine_threadsafe(
return asyncio.run_coroutine_threadsafe(
self.async_camera_image(), self.hass.loop
).result()

View File

@ -34,7 +34,6 @@ from homeassistant.helpers.event import async_track_state_change
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.config_validation import ENTITY_SERVICE_SCHEMA
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util.async_ import run_coroutine_threadsafe
# mypy: allow-untyped-calls, allow-untyped-defs
@ -430,7 +429,7 @@ class Group(Entity):
mode=None,
):
"""Initialize a group."""
return run_coroutine_threadsafe(
return asyncio.run_coroutine_threadsafe(
Group.async_create_group(
hass,
name,
@ -546,7 +545,7 @@ class Group(Entity):
def update_tracked_entity_ids(self, entity_ids):
"""Update the member entity IDs."""
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.async_update_tracked_entity_ids(entity_ids), self.hass.loop
).result()

View File

@ -39,7 +39,7 @@ from homeassistant.helpers import config_validation as cv, template
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import ConfigType, HomeAssistantType, ServiceDataType
from homeassistant.loader import bind_hass
from homeassistant.util.async_ import run_callback_threadsafe, run_coroutine_threadsafe
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.util.logging import catch_log_exception
# Loading the config flow file will register the flow
@ -463,7 +463,7 @@ def subscribe(
encoding: str = "utf-8",
) -> Callable[[], None]:
"""Subscribe to an MQTT topic."""
async_remove = run_coroutine_threadsafe(
async_remove = asyncio.run_coroutine_threadsafe(
async_subscribe(hass, topic, msg_callback, qos, encoding), hass.loop
).result()

View File

@ -9,7 +9,6 @@ from homeassistant.components.camera import PLATFORM_SCHEMA, Camera
from homeassistant.const import CONF_ENTITY_ID, CONF_NAME, CONF_MODE
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.util.async_ import run_coroutine_threadsafe
import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__)
@ -220,7 +219,7 @@ class ProxyCamera(Camera):
def camera_image(self):
"""Return camera image."""
return run_coroutine_threadsafe(
return asyncio.run_coroutine_threadsafe(
self.async_camera_image(), self.hass.loop
).result()

View File

@ -64,11 +64,7 @@ from homeassistant.exceptions import (
Unauthorized,
ServiceNotFound,
)
from homeassistant.util.async_ import (
run_coroutine_threadsafe,
run_callback_threadsafe,
fire_coroutine_threadsafe,
)
from homeassistant.util.async_ import run_callback_threadsafe, fire_coroutine_threadsafe
from homeassistant import util
import homeassistant.util.dt as dt_util
from homeassistant.util import location, slugify
@ -375,7 +371,9 @@ class HomeAssistant:
def block_till_done(self) -> None:
"""Block till all pending work is done."""
run_coroutine_threadsafe(self.async_block_till_done(), self.loop).result()
asyncio.run_coroutine_threadsafe(
self.async_block_till_done(), self.loop
).result()
async def async_block_till_done(self) -> None:
"""Block till all pending work is done."""
@ -1168,7 +1166,7 @@ class ServiceRegistry:
Because the service is sent as an event you are not allowed to use
the keys ATTR_DOMAIN and ATTR_SERVICE in your service_data.
"""
return run_coroutine_threadsafe( # type: ignore
return asyncio.run_coroutine_threadsafe(
self.async_call(domain, service, service_data, blocking, context),
self._hass.loop,
).result()

View File

@ -6,7 +6,7 @@ from typing import Optional
from homeassistant.const import DEVICE_DEFAULT_NAME
from homeassistant.core import callback, valid_entity_id, split_entity_id
from homeassistant.exceptions import HomeAssistantError, PlatformNotReady
from homeassistant.util.async_ import run_callback_threadsafe, run_coroutine_threadsafe
from homeassistant.util.async_ import run_callback_threadsafe
from .entity_registry import DISABLED_INTEGRATION
from .event import async_track_time_interval, async_call_later
@ -220,7 +220,7 @@ class EntityPlatform:
"only inside tests or you can run into a deadlock!"
)
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.async_add_entities(list(new_entities), update_before_add),
self.hass.loop,
).result()

View File

@ -1,5 +1,5 @@
"""Helpers to execute scripts."""
import asyncio
import logging
from contextlib import suppress
from datetime import datetime
@ -29,7 +29,7 @@ from homeassistant.helpers.event import (
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_integration
import homeassistant.util.dt as date_util
from homeassistant.util.async_ import run_coroutine_threadsafe, run_callback_threadsafe
from homeassistant.util.async_ import run_callback_threadsafe
# mypy: allow-untyped-calls, allow-untyped-defs, no-check-untyped-defs
@ -136,7 +136,7 @@ class Script:
def run(self, variables=None, context=None):
"""Run script."""
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.async_run(variables, context), self.hass.loop
).result()

View File

@ -20,7 +20,6 @@ from homeassistant.loader import async_get_integration, bind_hass
from homeassistant.util.yaml import load_yaml
from homeassistant.util.yaml.loader import JSON_TYPE
import homeassistant.helpers.config_validation as cv
from homeassistant.util.async_ import run_coroutine_threadsafe
from homeassistant.helpers.typing import HomeAssistantType
@ -42,7 +41,7 @@ def call_from_config(
hass, config, blocking=False, variables=None, validate_config=True
):
"""Call a service based on a config hash."""
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
async_call_from_config(hass, config, blocking, variables, validate_config),
hass.loop,
).result()
@ -105,7 +104,7 @@ def extract_entity_ids(hass, service_call, expand_group=True):
Will convert group entity ids to the entity ids it represents.
"""
return run_coroutine_threadsafe(
return asyncio.run_coroutine_threadsafe(
async_extract_entity_ids(hass, service_call, expand_group), hass.loop
).result()

View File

@ -44,7 +44,6 @@ from homeassistant.const import (
SERVICE_SELECT_OPTION,
)
from homeassistant.core import Context, State, DOMAIN as HASS_DOMAIN
from homeassistant.util.async_ import run_coroutine_threadsafe
from .typing import HomeAssistantType
_LOGGER = logging.getLogger(__name__)
@ -122,7 +121,7 @@ def reproduce_state(
blocking: bool = False,
) -> None:
"""Reproduce given state."""
return run_coroutine_threadsafe( # type: ignore
return asyncio.run_coroutine_threadsafe(
async_reproduce_state(hass, states, blocking), hass.loop
).result()

View File

@ -10,7 +10,6 @@ from homeassistant import requirements, core, loader, config as conf_util
from homeassistant.config import async_notify_setup_error
from homeassistant.const import EVENT_COMPONENT_LOADED, PLATFORM_FORMAT
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe
_LOGGER = logging.getLogger(__name__)
@ -25,7 +24,7 @@ SLOW_SETUP_WARNING = 10
def setup_component(hass: core.HomeAssistant, domain: str, config: Dict) -> bool:
"""Set up a component and all its dependencies."""
return run_coroutine_threadsafe( # type: ignore
return asyncio.run_coroutine_threadsafe(
async_setup_component(hass, domain, config), hass.loop
).result()

View File

@ -7,7 +7,7 @@ from asyncio.events import AbstractEventLoop
import asyncio
from asyncio import ensure_future
from typing import Any, Union, Coroutine, Callable, Generator, TypeVar, Awaitable
from typing import Any, Coroutine, Callable, TypeVar, Awaitable
_LOGGER = logging.getLogger(__name__)
@ -30,20 +30,6 @@ except AttributeError:
loop.close()
def run_coroutine_threadsafe(
coro: Union[Coroutine, Generator], loop: AbstractEventLoop
) -> concurrent.futures.Future:
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
ident = loop.__dict__.get("_thread_ident")
if ident is not None and ident == threading.get_ident():
raise RuntimeError("Cannot be called from within the event loop")
return asyncio.run_coroutine_threadsafe(coro, loop)
def fire_coroutine_threadsafe(coro: Coroutine, loop: AbstractEventLoop) -> None:
"""Submit a coroutine object to a given event loop.

View File

@ -8,8 +8,6 @@ import threading
import traceback
from typing import Any, Callable, Coroutine, Optional
from .async_ import run_coroutine_threadsafe
class HideSensitiveDataFilter(logging.Filter):
"""Filter API password calls."""
@ -83,7 +81,9 @@ class AsyncHandler:
def _process(self) -> None:
"""Process log in a thread."""
while True:
record = run_coroutine_threadsafe(self._queue.get(), self.loop).result()
record = asyncio.run_coroutine_threadsafe(
self._queue.get(), self.loop
).result()
if record is None:
self.handler.close()

View File

@ -53,7 +53,7 @@ from homeassistant.helpers import (
from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util.unit_system import METRIC_SYSTEM
from homeassistant.util.async_ import run_callback_threadsafe, run_coroutine_threadsafe
from homeassistant.util.async_ import run_callback_threadsafe
from homeassistant.components.device_automation import ( # noqa
_async_get_device_automations as async_get_device_automations,
)
@ -92,7 +92,9 @@ def threadsafe_coroutine_factory(func):
def threadsafe(*args, **kwargs):
"""Call func threadsafe."""
hass = args[0]
return run_coroutine_threadsafe(func(*args, **kwargs), hass.loop).result()
return asyncio.run_coroutine_threadsafe(
func(*args, **kwargs), hass.loop
).result()
return threadsafe
@ -125,7 +127,7 @@ def get_test_home_assistant():
def start_hass(*mocks):
"""Start hass."""
run_coroutine_threadsafe(hass.async_start(), loop).result()
asyncio.run_coroutine_threadsafe(hass.async_start(), loop).result()
def stop_hass():
"""Stop hass."""

View File

@ -17,7 +17,6 @@ from homeassistant.components.camera.const import DOMAIN, PREF_PRELOAD_STREAM
from homeassistant.components.camera.prefs import CameraEntityPreferences
from homeassistant.components.websocket_api.const import TYPE_RESULT
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util.async_ import run_coroutine_threadsafe
from tests.common import (
get_test_home_assistant,
@ -110,7 +109,7 @@ class TestGetImage:
"""Grab an image from camera entity."""
self.hass.start()
image = run_coroutine_threadsafe(
image = asyncio.run_coroutine_threadsafe(
camera.async_get_image(self.hass, "camera.demo_camera"), self.hass.loop
).result()
@ -123,7 +122,7 @@ class TestGetImage:
"homeassistant.helpers.entity_component.EntityComponent." "get_entity",
return_value=None,
), pytest.raises(HomeAssistantError):
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
camera.async_get_image(self.hass, "camera.demo_camera"), self.hass.loop
).result()
@ -133,7 +132,7 @@ class TestGetImage:
"homeassistant.components.camera.Camera.async_camera_image",
side_effect=asyncio.TimeoutError,
), pytest.raises(HomeAssistantError):
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
camera.async_get_image(self.hass, "camera.demo_camera"), self.hass.loop
).result()
@ -143,7 +142,7 @@ class TestGetImage:
"homeassistant.components.camera.Camera.async_camera_image",
return_value=mock_coro(None),
), pytest.raises(HomeAssistantError):
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
camera.async_get_image(self.hass, "camera.demo_camera"), self.hass.loop
).result()

View File

@ -1,4 +1,5 @@
"""The tests for the notify.group platform."""
import asyncio
import unittest
from unittest.mock import MagicMock, patch
@ -6,7 +7,6 @@ from homeassistant.setup import setup_component
import homeassistant.components.notify as notify
import homeassistant.components.group.notify as group
import homeassistant.components.demo.notify as demo
from homeassistant.util.async_ import run_coroutine_threadsafe
from tests.common import assert_setup_component, get_test_home_assistant
@ -43,7 +43,7 @@ class TestNotifyGroup(unittest.TestCase):
},
)
self.service = run_coroutine_threadsafe(
self.service = asyncio.run_coroutine_threadsafe(
group.async_get_service(
self.hass,
{
@ -70,7 +70,7 @@ class TestNotifyGroup(unittest.TestCase):
def test_send_message_with_data(self):
"""Test sending a message with to a notify group."""
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.service.async_send_message(
"Hello", title="Test notification", data={"hello": "world"}
),

View File

@ -1,5 +1,6 @@
"""The tests for Core components."""
# pylint: disable=protected-access
import asyncio
import unittest
from unittest.mock import patch, Mock
@ -27,7 +28,6 @@ from homeassistant.components.homeassistant import (
import homeassistant.helpers.intent as intent
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity
from homeassistant.util.async_ import run_coroutine_threadsafe
from tests.common import (
get_test_home_assistant,
@ -111,7 +111,7 @@ class TestComponentsCore(unittest.TestCase):
def setUp(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
assert run_coroutine_threadsafe(
assert asyncio.run_coroutine_threadsafe(
async_setup_component(self.hass, "homeassistant", {}), self.hass.loop
).result()

View File

@ -10,7 +10,6 @@ from homeassistant.const import (
STATE_OFF,
STATE_IDLE,
)
from homeassistant.util.async_ import run_coroutine_threadsafe
from tests.common import get_test_home_assistant
@ -162,21 +161,23 @@ class TestAsyncMediaPlayer(unittest.TestCase):
def test_volume_up(self):
"""Test the volume_up helper function."""
assert self.player.volume_level == 0
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_set_volume_level(0.5), self.hass.loop
).result()
assert self.player.volume_level == 0.5
run_coroutine_threadsafe(self.player.async_volume_up(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_volume_up(), self.hass.loop
).result()
assert self.player.volume_level == 0.6
def test_volume_down(self):
"""Test the volume_down helper function."""
assert self.player.volume_level == 0
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_set_volume_level(0.5), self.hass.loop
).result()
assert self.player.volume_level == 0.5
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_volume_down(), self.hass.loop
).result()
assert self.player.volume_level == 0.4
@ -184,11 +185,11 @@ class TestAsyncMediaPlayer(unittest.TestCase):
def test_media_play_pause(self):
"""Test the media_play_pause helper function."""
assert self.player.state == STATE_OFF
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_media_play_pause(), self.hass.loop
).result()
assert self.player.state == STATE_PLAYING
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_media_play_pause(), self.hass.loop
).result()
assert self.player.state == STATE_PAUSED
@ -196,9 +197,13 @@ class TestAsyncMediaPlayer(unittest.TestCase):
def test_toggle(self):
"""Test the toggle helper function."""
assert self.player.state == STATE_OFF
run_coroutine_threadsafe(self.player.async_toggle(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_toggle(), self.hass.loop
).result()
assert self.player.state == STATE_ON
run_coroutine_threadsafe(self.player.async_toggle(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_toggle(), self.hass.loop
).result()
assert self.player.state == STATE_OFF
@ -219,7 +224,9 @@ class TestSyncMediaPlayer(unittest.TestCase):
assert self.player.volume_level == 0
self.player.set_volume_level(0.5)
assert self.player.volume_level == 0.5
run_coroutine_threadsafe(self.player.async_volume_up(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_volume_up(), self.hass.loop
).result()
assert self.player.volume_level == 0.7
def test_volume_down(self):
@ -227,7 +234,7 @@ class TestSyncMediaPlayer(unittest.TestCase):
assert self.player.volume_level == 0
self.player.set_volume_level(0.5)
assert self.player.volume_level == 0.5
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_volume_down(), self.hass.loop
).result()
assert self.player.volume_level == 0.3
@ -235,11 +242,11 @@ class TestSyncMediaPlayer(unittest.TestCase):
def test_media_play_pause(self):
"""Test the media_play_pause helper function."""
assert self.player.state == STATE_OFF
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_media_play_pause(), self.hass.loop
).result()
assert self.player.state == STATE_PLAYING
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
self.player.async_media_play_pause(), self.hass.loop
).result()
assert self.player.state == STATE_PAUSED
@ -247,7 +254,11 @@ class TestSyncMediaPlayer(unittest.TestCase):
def test_toggle(self):
"""Test the toggle helper function."""
assert self.player.state == STATE_OFF
run_coroutine_threadsafe(self.player.async_toggle(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_toggle(), self.hass.loop
).result()
assert self.player.state == STATE_ON
run_coroutine_threadsafe(self.player.async_toggle(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.player.async_toggle(), self.hass.loop
).result()
assert self.player.state == STATE_OFF

View File

@ -5,7 +5,6 @@ import aiohttp
import homeassistant.components.rest.switch as rest
from homeassistant.setup import setup_component
from homeassistant.util.async_ import run_coroutine_threadsafe
from homeassistant.helpers.template import Template
from tests.common import get_test_home_assistant, assert_setup_component
@ -23,14 +22,14 @@ class TestRestSwitchSetup:
def test_setup_missing_config(self):
"""Test setup with configuration missing required entries."""
assert not run_coroutine_threadsafe(
assert not asyncio.run_coroutine_threadsafe(
rest.async_setup_platform(self.hass, {"platform": "rest"}, None),
self.hass.loop,
).result()
def test_setup_missing_schema(self):
"""Test setup with resource missing schema."""
assert not run_coroutine_threadsafe(
assert not asyncio.run_coroutine_threadsafe(
rest.async_setup_platform(
self.hass, {"platform": "rest", "resource": "localhost"}, None
),
@ -40,7 +39,7 @@ class TestRestSwitchSetup:
def test_setup_failed_connect(self, aioclient_mock):
"""Test setup when connection error occurs."""
aioclient_mock.get("http://localhost", exc=aiohttp.ClientError)
assert not run_coroutine_threadsafe(
assert not asyncio.run_coroutine_threadsafe(
rest.async_setup_platform(
self.hass, {"platform": "rest", "resource": "http://localhost"}, None
),
@ -50,7 +49,7 @@ class TestRestSwitchSetup:
def test_setup_timeout(self, aioclient_mock):
"""Test setup when connection timeout occurs."""
aioclient_mock.get("http://localhost", exc=asyncio.TimeoutError())
assert not run_coroutine_threadsafe(
assert not asyncio.run_coroutine_threadsafe(
rest.async_setup_platform(
self.hass, {"platform": "rest", "resource": "http://localhost"}, None
),
@ -131,7 +130,9 @@ class TestRestSwitch:
def test_turn_on_success(self, aioclient_mock):
"""Test turn_on."""
aioclient_mock.post(self.resource, status=200)
run_coroutine_threadsafe(self.switch.async_turn_on(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_on(), self.hass.loop
).result()
assert self.body_on.template == aioclient_mock.mock_calls[-1][2].decode()
assert self.switch.is_on
@ -139,7 +140,9 @@ class TestRestSwitch:
def test_turn_on_status_not_ok(self, aioclient_mock):
"""Test turn_on when error status returned."""
aioclient_mock.post(self.resource, status=500)
run_coroutine_threadsafe(self.switch.async_turn_on(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_on(), self.hass.loop
).result()
assert self.body_on.template == aioclient_mock.mock_calls[-1][2].decode()
assert self.switch.is_on is None
@ -147,14 +150,18 @@ class TestRestSwitch:
def test_turn_on_timeout(self, aioclient_mock):
"""Test turn_on when timeout occurs."""
aioclient_mock.post(self.resource, status=500)
run_coroutine_threadsafe(self.switch.async_turn_on(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_on(), self.hass.loop
).result()
assert self.switch.is_on is None
def test_turn_off_success(self, aioclient_mock):
"""Test turn_off."""
aioclient_mock.post(self.resource, status=200)
run_coroutine_threadsafe(self.switch.async_turn_off(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_off(), self.hass.loop
).result()
assert self.body_off.template == aioclient_mock.mock_calls[-1][2].decode()
assert not self.switch.is_on
@ -162,7 +169,9 @@ class TestRestSwitch:
def test_turn_off_status_not_ok(self, aioclient_mock):
"""Test turn_off when error status returned."""
aioclient_mock.post(self.resource, status=500)
run_coroutine_threadsafe(self.switch.async_turn_off(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_off(), self.hass.loop
).result()
assert self.body_off.template == aioclient_mock.mock_calls[-1][2].decode()
assert self.switch.is_on is None
@ -170,34 +179,44 @@ class TestRestSwitch:
def test_turn_off_timeout(self, aioclient_mock):
"""Test turn_off when timeout occurs."""
aioclient_mock.post(self.resource, exc=asyncio.TimeoutError())
run_coroutine_threadsafe(self.switch.async_turn_on(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_turn_on(), self.hass.loop
).result()
assert self.switch.is_on is None
def test_update_when_on(self, aioclient_mock):
"""Test update when switch is on."""
aioclient_mock.get(self.resource, text=self.body_on.template)
run_coroutine_threadsafe(self.switch.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_update(), self.hass.loop
).result()
assert self.switch.is_on
def test_update_when_off(self, aioclient_mock):
"""Test update when switch is off."""
aioclient_mock.get(self.resource, text=self.body_off.template)
run_coroutine_threadsafe(self.switch.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_update(), self.hass.loop
).result()
assert not self.switch.is_on
def test_update_when_unknown(self, aioclient_mock):
"""Test update when unknown status returned."""
aioclient_mock.get(self.resource, text="unknown status")
run_coroutine_threadsafe(self.switch.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_update(), self.hass.loop
).result()
assert self.switch.is_on is None
def test_update_timeout(self, aioclient_mock):
"""Test update when timeout occurs."""
aioclient_mock.get(self.resource, exc=asyncio.TimeoutError())
run_coroutine_threadsafe(self.switch.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
self.switch.async_update(), self.hass.loop
).result()
assert self.switch.is_on is None

View File

@ -1,4 +1,5 @@
"""The tests for the Universal Media player platform."""
import asyncio
from copy import copy
import unittest
@ -10,7 +11,6 @@ import homeassistant.components.input_number as input_number
import homeassistant.components.input_select as input_select
import homeassistant.components.media_player as media_player
import homeassistant.components.universal.media_player as universal
from homeassistant.util.async_ import run_coroutine_threadsafe
from tests.common import mock_service, get_test_home_assistant
@ -298,7 +298,7 @@ class TestMediaPlayer(unittest.TestCase):
setup_ok = True
try:
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
universal.async_setup_platform(
self.hass, validate_config(bad_config), add_entities
),
@ -309,7 +309,7 @@ class TestMediaPlayer(unittest.TestCase):
assert not setup_ok
assert 0 == len(entities)
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
universal.async_setup_platform(
self.hass, validate_config(config), add_entities
),
@ -369,26 +369,26 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert ump._child_state is None
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert self.mock_mp_1.entity_id == ump._child_state.entity_id
self.mock_mp_2._state = STATE_PLAYING
self.mock_mp_2.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert self.mock_mp_1.entity_id == ump._child_state.entity_id
self.mock_mp_1._state = STATE_OFF
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert self.mock_mp_2.entity_id == ump._child_state.entity_id
def test_name(self):
@ -413,14 +413,14 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert ump.state, STATE_OFF
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert STATE_PLAYING == ump.state
def test_state_with_children_and_attrs(self):
@ -429,22 +429,22 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert STATE_OFF == ump.state
self.hass.states.set(self.mock_state_switch_id, STATE_ON)
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert STATE_ON == ump.state
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert STATE_PLAYING == ump.state
self.hass.states.set(self.mock_state_switch_id, STATE_OFF)
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert STATE_OFF == ump.state
def test_volume_level(self):
@ -453,20 +453,20 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert ump.volume_level is None
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert 0 == ump.volume_level
self.mock_mp_1._volume_level = 1
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert 1 == ump.volume_level
def test_media_image_url(self):
@ -476,7 +476,7 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert ump.media_image_url is None
@ -484,7 +484,7 @@ class TestMediaPlayer(unittest.TestCase):
self.mock_mp_1._media_image_url = test_url
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
# mock_mp_1 will convert the url to the api proxy url. This test
# ensures ump passes through the same url without an additional proxy.
assert self.mock_mp_1.entity_picture == ump.entity_picture
@ -495,20 +495,20 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert not ump.is_volume_muted
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert not ump.is_volume_muted
self.mock_mp_1._is_volume_muted = True
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert ump.is_volume_muted
def test_source_list_children_and_attr(self):
@ -561,7 +561,7 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert 0 == ump.supported_features
@ -569,7 +569,7 @@ class TestMediaPlayer(unittest.TestCase):
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
assert 512 == ump.supported_features
def test_supported_features_children_and_cmds(self):
@ -590,12 +590,12 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
self.mock_mp_1._state = STATE_PLAYING
self.mock_mp_1.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
check_flags = (
universal.SUPPORT_TURN_ON
@ -615,16 +615,16 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
self.mock_mp_1._state = STATE_OFF
self.mock_mp_1.schedule_update_ha_state()
self.mock_mp_2._state = STATE_OFF
self.mock_mp_2.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
assert 0 == len(self.mock_mp_1.service_calls["turn_off"])
assert 0 == len(self.mock_mp_2.service_calls["turn_off"])
@ -634,67 +634,85 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
self.mock_mp_2._state = STATE_PLAYING
self.mock_mp_2.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
assert 1 == len(self.mock_mp_2.service_calls["turn_off"])
run_coroutine_threadsafe(ump.async_turn_on(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_turn_on(), self.hass.loop).result()
assert 1 == len(self.mock_mp_2.service_calls["turn_on"])
run_coroutine_threadsafe(ump.async_mute_volume(True), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_mute_volume(True), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["mute_volume"])
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
ump.async_set_volume_level(0.5), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["set_volume_level"])
run_coroutine_threadsafe(ump.async_media_play(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_media_play(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_play"])
run_coroutine_threadsafe(ump.async_media_pause(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_media_pause(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_pause"])
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
ump.async_media_previous_track(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_previous_track"])
run_coroutine_threadsafe(ump.async_media_next_track(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_media_next_track(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_next_track"])
run_coroutine_threadsafe(ump.async_media_seek(100), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_media_seek(100), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_seek"])
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
ump.async_play_media("movie", "batman"), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["play_media"])
run_coroutine_threadsafe(ump.async_volume_up(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_volume_up(), self.hass.loop).result()
assert 1 == len(self.mock_mp_2.service_calls["volume_up"])
run_coroutine_threadsafe(ump.async_volume_down(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_volume_down(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["volume_down"])
run_coroutine_threadsafe(ump.async_media_play_pause(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_media_play_pause(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["media_play_pause"])
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
ump.async_select_source("dvd"), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["select_source"])
run_coroutine_threadsafe(ump.async_clear_playlist(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_clear_playlist(), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["clear_playlist"])
run_coroutine_threadsafe(ump.async_set_shuffle(True), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
ump.async_set_shuffle(True), self.hass.loop
).result()
assert 1 == len(self.mock_mp_2.service_calls["shuffle_set"])
def test_service_call_to_command(self):
@ -707,12 +725,12 @@ class TestMediaPlayer(unittest.TestCase):
ump = universal.UniversalMediaPlayer(self.hass, **config)
ump.entity_id = media_player.ENTITY_ID_FORMAT.format(config["name"])
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
self.mock_mp_2._state = STATE_PLAYING
self.mock_mp_2.schedule_update_ha_state()
self.hass.block_till_done()
run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_update(), self.hass.loop).result()
run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(ump.async_turn_off(), self.hass.loop).result()
assert 1 == len(service)

View File

@ -1,9 +1,9 @@
"""The tests for the uptime sensor platform."""
import asyncio
import unittest
from unittest.mock import patch
from datetime import timedelta
from homeassistant.util.async_ import run_coroutine_threadsafe
from homeassistant.setup import setup_component
from homeassistant.components.uptime.sensor import UptimeSensor
from tests.common import get_test_home_assistant
@ -46,11 +46,15 @@ class TestUptimeSensor(unittest.TestCase):
assert sensor.unit_of_measurement == "days"
new_time = sensor.initial + timedelta(days=1)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 1.00
new_time = sensor.initial + timedelta(days=111.499)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 111.50
def test_uptime_sensor_hours_output(self):
@ -59,11 +63,15 @@ class TestUptimeSensor(unittest.TestCase):
assert sensor.unit_of_measurement == "hours"
new_time = sensor.initial + timedelta(hours=16)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 16.00
new_time = sensor.initial + timedelta(hours=72.499)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 72.50
def test_uptime_sensor_minutes_output(self):
@ -72,9 +80,13 @@ class TestUptimeSensor(unittest.TestCase):
assert sensor.unit_of_measurement == "minutes"
new_time = sensor.initial + timedelta(minutes=16)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 16.00
new_time = sensor.initial + timedelta(minutes=12.499)
with patch("homeassistant.util.dt.now", return_value=new_time):
run_coroutine_threadsafe(sensor.async_update(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
sensor.async_update(), self.hass.loop
).result()
assert sensor.state == 12.50

View File

@ -15,7 +15,6 @@ import pytest
import homeassistant.core as ha
from homeassistant.exceptions import InvalidEntityFormatError, InvalidStateError
from homeassistant.util.async_ import run_coroutine_threadsafe
import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import METRIC_SYSTEM
from homeassistant.const import (
@ -191,7 +190,7 @@ class TestHomeAssistant(unittest.TestCase):
for _ in range(3):
self.hass.add_job(test_coro())
run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe(
asyncio.wait(self.hass._pending_tasks), loop=self.hass.loop
).result()
@ -216,7 +215,9 @@ class TestHomeAssistant(unittest.TestCase):
yield from asyncio.sleep(0)
yield from asyncio.sleep(0)
run_coroutine_threadsafe(wait_finish_callback(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
wait_finish_callback(), self.hass.loop
).result()
assert len(self.hass._pending_tasks) == 2
self.hass.block_till_done()
@ -239,7 +240,9 @@ class TestHomeAssistant(unittest.TestCase):
for _ in range(2):
self.hass.add_job(test_executor)
run_coroutine_threadsafe(wait_finish_callback(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
wait_finish_callback(), self.hass.loop
).result()
assert len(self.hass._pending_tasks) == 2
self.hass.block_till_done()
@ -263,7 +266,9 @@ class TestHomeAssistant(unittest.TestCase):
for _ in range(2):
self.hass.add_job(test_callback)
run_coroutine_threadsafe(wait_finish_callback(), self.hass.loop).result()
asyncio.run_coroutine_threadsafe(
wait_finish_callback(), self.hass.loop
).result()
self.hass.block_till_done()

View File

@ -9,43 +9,6 @@ import pytest
from homeassistant.util import async_ as hasync
@patch("asyncio.coroutines.iscoroutine")
@patch("concurrent.futures.Future")
@patch("threading.get_ident")
def test_run_coroutine_threadsafe_from_inside_event_loop(
mock_ident, _, mock_iscoroutine
):
"""Testing calling run_coroutine_threadsafe from inside an event loop."""
coro = MagicMock()
loop = MagicMock()
loop._thread_ident = None
mock_ident.return_value = 5
mock_iscoroutine.return_value = True
hasync.run_coroutine_threadsafe(coro, loop)
assert len(loop.call_soon_threadsafe.mock_calls) == 1
loop._thread_ident = 5
mock_ident.return_value = 5
mock_iscoroutine.return_value = True
with pytest.raises(RuntimeError):
hasync.run_coroutine_threadsafe(coro, loop)
assert len(loop.call_soon_threadsafe.mock_calls) == 1
loop._thread_ident = 1
mock_ident.return_value = 5
mock_iscoroutine.return_value = False
with pytest.raises(TypeError):
hasync.run_coroutine_threadsafe(coro, loop)
assert len(loop.call_soon_threadsafe.mock_calls) == 1
loop._thread_ident = 1
mock_ident.return_value = 5
mock_iscoroutine.return_value = True
hasync.run_coroutine_threadsafe(coro, loop)
assert len(loop.call_soon_threadsafe.mock_calls) == 2
@patch("asyncio.coroutines.iscoroutine")
@patch("concurrent.futures.Future")
@patch("threading.get_ident")
@ -187,49 +150,6 @@ class RunThreadsafeTests(TestCase):
finally:
future.done() or future.cancel()
def test_run_coroutine_threadsafe(self):
"""Test coroutine submission from a thread to an event loop."""
future = self.loop.run_in_executor(None, self.target_coroutine)
result = self.loop.run_until_complete(future)
self.assertEqual(result, 3)
def test_run_coroutine_threadsafe_with_exception(self):
"""Test coroutine submission from thread to event loop on exception."""
future = self.loop.run_in_executor(None, self.target_coroutine, True)
with self.assertRaises(RuntimeError) as exc_context:
self.loop.run_until_complete(future)
self.assertIn("Fail!", exc_context.exception.args)
def test_run_coroutine_threadsafe_with_invalid(self):
"""Test coroutine submission from thread to event loop on invalid."""
callback = lambda: self.target_coroutine(invalid=True) # noqa
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(ValueError) as exc_context:
self.loop.run_until_complete(future)
self.assertIn("Invalid!", exc_context.exception.args)
def test_run_coroutine_threadsafe_with_timeout(self):
"""Test coroutine submission from thread to event loop on timeout."""
callback = lambda: self.target_coroutine(timeout=0) # noqa
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.TimeoutError):
self.loop.run_until_complete(future)
self.run_briefly(self.loop)
# Check that there's no pending task (add has been cancelled)
if sys.version_info[:2] >= (3, 7):
all_tasks = asyncio.all_tasks
else:
all_tasks = asyncio.Task.all_tasks
for task in all_tasks(self.loop):
self.assertTrue(task.done())
def test_run_coroutine_threadsafe_task_cancelled(self):
"""Test coroutine submission from tread to event loop on cancel."""
callback = lambda: self.target_coroutine(cancel=True) # noqa
future = self.loop.run_in_executor(None, callback)
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(future)
def test_run_callback_threadsafe(self):
"""Test callback submission from a thread to an event loop."""
future = self.loop.run_in_executor(None, self.target_callback)