Collection of tests improvements (#33778)

This commit is contained in:
Franck Nijhof 2020-04-07 18:36:35 +02:00 committed by GitHub
parent bee742994e
commit bb8bbc9c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 342 additions and 286 deletions

View File

@ -1,233 +1,207 @@
"""The tests for the Demo Media player platform.""" """The tests for the Demo Media player platform."""
import asyncio from asynctest import patch
import unittest
from unittest.mock import patch
import pytest import pytest
import voluptuous as vol import voluptuous as vol
import homeassistant.components.media_player as mp import homeassistant.components.media_player as mp
from homeassistant.helpers.aiohttp_client import DATA_CLIENTSESSION from homeassistant.helpers.aiohttp_client import DATA_CLIENTSESSION
from homeassistant.setup import async_setup_component, setup_component from homeassistant.setup import async_setup_component
from tests.common import get_test_home_assistant
from tests.components.media_player import common from tests.components.media_player import common
entity_id = "media_player.walkman" TEST_ENTITY_ID = "media_player.walkman"
class TestDemoMediaPlayer(unittest.TestCase): @pytest.fixture(name="mock_media_seek")
"""Test the media_player module.""" def media_player_media_seek_fixture():
"""Mock demo YouTube player media seek."""
def setUp(self): # pylint: disable=invalid-name with patch(
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
def tearDown(self):
"""Shut down test instance."""
self.hass.stop()
def test_source_select(self):
"""Test the input source service."""
entity_id = "media_player.lounge_room"
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
state = self.hass.states.get(entity_id)
assert "dvd" == state.attributes.get("source")
with pytest.raises(vol.Invalid):
common.select_source(self.hass, None, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert "dvd" == state.attributes.get("source")
common.select_source(self.hass, "xbox", entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert "xbox" == state.attributes.get("source")
def test_clear_playlist(self):
"""Test clear playlist."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert self.hass.states.is_state(entity_id, "playing")
common.clear_playlist(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "off")
def test_volume_services(self):
"""Test the volume service."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
state = self.hass.states.get(entity_id)
assert 1.0 == state.attributes.get("volume_level")
with pytest.raises(vol.Invalid):
common.set_volume_level(self.hass, None, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 1.0 == state.attributes.get("volume_level")
common.set_volume_level(self.hass, 0.5, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.5 == state.attributes.get("volume_level")
common.volume_down(self.hass, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.4 == state.attributes.get("volume_level")
common.volume_up(self.hass, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 0.5 == state.attributes.get("volume_level")
assert False is state.attributes.get("is_volume_muted")
with pytest.raises(vol.Invalid):
common.mute_volume(self.hass, None, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert False is state.attributes.get("is_volume_muted")
common.mute_volume(self.hass, True, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert True is state.attributes.get("is_volume_muted")
def test_turning_off_and_on(self):
"""Test turn_on and turn_off."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert self.hass.states.is_state(entity_id, "playing")
common.turn_off(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "off")
assert not mp.is_on(self.hass, entity_id)
common.turn_on(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "playing")
common.toggle(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "off")
assert not mp.is_on(self.hass, entity_id)
def test_playing_pausing(self):
"""Test media_pause."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert self.hass.states.is_state(entity_id, "playing")
common.media_pause(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "paused")
common.media_play_pause(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "playing")
common.media_play_pause(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "paused")
common.media_play(self.hass, entity_id)
self.hass.block_till_done()
assert self.hass.states.is_state(entity_id, "playing")
def test_prev_next_track(self):
"""Test media_next_track and media_previous_track ."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
state = self.hass.states.get(entity_id)
assert 1 == state.attributes.get("media_track")
common.media_next_track(self.hass, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 2 == state.attributes.get("media_track")
common.media_next_track(self.hass, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 3 == state.attributes.get("media_track")
common.media_previous_track(self.hass, entity_id)
self.hass.block_till_done()
state = self.hass.states.get(entity_id)
assert 2 == state.attributes.get("media_track")
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
ent_id = "media_player.lounge_room"
state = self.hass.states.get(ent_id)
assert 1 == state.attributes.get("media_episode")
common.media_next_track(self.hass, ent_id)
self.hass.block_till_done()
state = self.hass.states.get(ent_id)
assert 2 == state.attributes.get("media_episode")
common.media_previous_track(self.hass, ent_id)
self.hass.block_till_done()
state = self.hass.states.get(ent_id)
assert 1 == state.attributes.get("media_episode")
def test_play_media(self):
"""Test play_media ."""
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
ent_id = "media_player.living_room"
state = self.hass.states.get(ent_id)
assert 0 < (mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features"))
assert state.attributes.get("media_content_id") is not None
with pytest.raises(vol.Invalid):
common.play_media(self.hass, None, "some_id", ent_id)
self.hass.block_till_done()
state = self.hass.states.get(ent_id)
assert 0 < (mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features"))
assert not "some_id" == state.attributes.get("media_content_id")
common.play_media(self.hass, "youtube", "some_id", ent_id)
self.hass.block_till_done()
state = self.hass.states.get(ent_id)
assert 0 < (mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features"))
assert "some_id" == state.attributes.get("media_content_id")
@patch(
"homeassistant.components.demo.media_player.DemoYoutubePlayer.media_seek", "homeassistant.components.demo.media_player.DemoYoutubePlayer.media_seek",
autospec=True, autospec=True,
) as seek:
yield seek
async def test_source_select(hass):
"""Test the input source service."""
entity_id = "media_player.lounge_room"
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
) )
def test_seek(self, mock_seek): state = hass.states.get(entity_id)
"""Test seek.""" assert state.attributes.get("source") == "dvd"
assert setup_component(
self.hass, mp.DOMAIN, {"media_player": {"platform": "demo"}} with pytest.raises(vol.Invalid):
) await common.async_select_source(hass, None, entity_id)
ent_id = "media_player.living_room" state = hass.states.get(entity_id)
state = self.hass.states.get(ent_id) assert state.attributes.get("source") == "dvd"
assert state.attributes["supported_features"] & mp.SUPPORT_SEEK
assert not mock_seek.called await common.async_select_source(hass, "xbox", entity_id)
with pytest.raises(vol.Invalid): state = hass.states.get(entity_id)
common.media_seek(self.hass, None, ent_id) assert state.attributes.get("source") == "xbox"
self.hass.block_till_done()
assert not mock_seek.called
common.media_seek(self.hass, 100, ent_id) async def test_clear_playlist(hass):
self.hass.block_till_done() """Test clear playlist."""
assert mock_seek.called assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
await common.async_clear_playlist(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "off")
async def test_volume_services(hass):
"""Test the volume service."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("volume_level") == 1.0
with pytest.raises(vol.Invalid):
await common.async_set_volume_level(hass, None, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("volume_level") == 1.0
await common.async_set_volume_level(hass, 0.5, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("volume_level") == 0.5
await common.async_volume_down(hass, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("volume_level") == 0.4
await common.async_volume_up(hass, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("volume_level") == 0.5
assert False is state.attributes.get("is_volume_muted")
with pytest.raises(vol.Invalid):
await common.async_mute_volume(hass, None, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("is_volume_muted") is False
await common.async_mute_volume(hass, True, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("is_volume_muted") is True
async def test_turning_off_and_on(hass):
"""Test turn_on and turn_off."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
await common.async_turn_off(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "off")
assert not mp.is_on(hass, TEST_ENTITY_ID)
await common.async_turn_on(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
await common.async_toggle(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "off")
assert not mp.is_on(hass, TEST_ENTITY_ID)
async def test_playing_pausing(hass):
"""Test media_pause."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
await common.async_media_pause(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "paused")
await common.async_media_play_pause(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
await common.async_media_play_pause(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "paused")
await common.async_media_play(hass, TEST_ENTITY_ID)
assert hass.states.is_state(TEST_ENTITY_ID, "playing")
async def test_prev_next_track(hass):
"""Test media_next_track and media_previous_track ."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("media_track") == 1
await common.async_media_next_track(hass, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("media_track") == 2
await common.async_media_next_track(hass, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("media_track") == 3
await common.async_media_previous_track(hass, TEST_ENTITY_ID)
state = hass.states.get(TEST_ENTITY_ID)
assert state.attributes.get("media_track") == 2
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
ent_id = "media_player.lounge_room"
state = hass.states.get(ent_id)
assert state.attributes.get("media_episode") == 1
await common.async_media_next_track(hass, ent_id)
state = hass.states.get(ent_id)
assert state.attributes.get("media_episode") == 2
await common.async_media_previous_track(hass, ent_id)
state = hass.states.get(ent_id)
assert state.attributes.get("media_episode") == 1
async def test_play_media(hass):
"""Test play_media ."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
ent_id = "media_player.living_room"
state = hass.states.get(ent_id)
assert mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features") > 0
assert state.attributes.get("media_content_id") is not None
with pytest.raises(vol.Invalid):
await common.async_play_media(hass, None, "some_id", ent_id)
state = hass.states.get(ent_id)
assert mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features") > 0
assert state.attributes.get("media_content_id") != "some_id"
await common.async_play_media(hass, "youtube", "some_id", ent_id)
state = hass.states.get(ent_id)
assert mp.SUPPORT_PLAY_MEDIA & state.attributes.get("supported_features") > 0
assert state.attributes.get("media_content_id") == "some_id"
async def test_seek(hass, mock_media_seek):
"""Test seek."""
assert await async_setup_component(
hass, mp.DOMAIN, {"media_player": {"platform": "demo"}}
)
ent_id = "media_player.living_room"
state = hass.states.get(ent_id)
assert state.attributes["supported_features"] & mp.SUPPORT_SEEK
assert not mock_media_seek.called
with pytest.raises(vol.Invalid):
await common.async_media_seek(hass, None, ent_id)
assert not mock_media_seek.called
await common.async_media_seek(hass, 100, ent_id)
assert mock_media_seek.called
async def test_media_image_proxy(hass, hass_client): async def test_media_image_proxy(hass, hass_client):
@ -239,30 +213,34 @@ async def test_media_image_proxy(hass, hass_client):
fake_picture_data = "test.test" fake_picture_data = "test.test"
class MockResponse: class MockResponse:
"""Test response."""
def __init__(self): def __init__(self):
"""Test response init."""
self.status = 200 self.status = 200
self.headers = {"Content-Type": "sometype"} self.headers = {"Content-Type": "sometype"}
@asyncio.coroutine async def read(self):
def read(self): """Test response read."""
return fake_picture_data.encode("ascii") return fake_picture_data.encode("ascii")
@asyncio.coroutine async def release(self):
def release(self): """Test response release."""
pass
class MockWebsession: class MockWebsession:
@asyncio.coroutine """Test websession."""
def get(self, url):
async def get(self, url):
"""Test websession get."""
return MockResponse() return MockResponse()
def detach(self): def detach(self):
pass """Test websession detach."""
hass.data[DATA_CLIENTSESSION] = MockWebsession() hass.data[DATA_CLIENTSESSION] = MockWebsession()
assert hass.states.is_state(entity_id, "playing") assert hass.states.is_state(TEST_ENTITY_ID, "playing")
state = hass.states.get(entity_id) state = hass.states.get(TEST_ENTITY_ID)
client = await hass_client() client = await hass_client()
req = await client.get(state.attributes.get("entity_picture")) req = await client.get(state.attributes.get("entity_picture"))
assert req.status == 200 assert req.status == 200

View File

@ -37,115 +37,192 @@ from homeassistant.const import (
from homeassistant.loader import bind_hass from homeassistant.loader import bind_hass
async def async_turn_on(hass, entity_id=ENTITY_MATCH_ALL):
"""Turn on specified media player or all."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
await hass.services.async_call(DOMAIN, SERVICE_TURN_ON, data, blocking=True)
@bind_hass @bind_hass
def turn_on(hass, entity_id=ENTITY_MATCH_ALL): def turn_on(hass, entity_id=ENTITY_MATCH_ALL):
"""Turn on specified media player or all.""" """Turn on specified media player or all."""
hass.add_job(async_turn_on, hass, entity_id)
async def async_turn_off(hass, entity_id=ENTITY_MATCH_ALL):
"""Turn off specified media player or all."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_TURN_ON, data) await hass.services.async_call(DOMAIN, SERVICE_TURN_OFF, data, blocking=True)
@bind_hass @bind_hass
def turn_off(hass, entity_id=ENTITY_MATCH_ALL): def turn_off(hass, entity_id=ENTITY_MATCH_ALL):
"""Turn off specified media player or all.""" """Turn off specified media player or all."""
hass.add_job(async_turn_off, hass, entity_id)
async def async_toggle(hass, entity_id=ENTITY_MATCH_ALL):
"""Toggle specified media player or all."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_TURN_OFF, data) await hass.services.async_call(DOMAIN, SERVICE_TOGGLE, data, blocking=True)
@bind_hass @bind_hass
def toggle(hass, entity_id=ENTITY_MATCH_ALL): def toggle(hass, entity_id=ENTITY_MATCH_ALL):
"""Toggle specified media player or all.""" """Toggle specified media player or all."""
hass.add_job(async_toggle, hass, entity_id)
async def async_volume_up(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for volume up."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_TOGGLE, data) await hass.services.async_call(DOMAIN, SERVICE_VOLUME_UP, data, blocking=True)
@bind_hass @bind_hass
def volume_up(hass, entity_id=ENTITY_MATCH_ALL): def volume_up(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for volume up.""" """Send the media player the command for volume up."""
hass.add_job(async_volume_up, hass, entity_id)
async def async_volume_down(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for volume down."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_VOLUME_UP, data) await hass.services.async_call(DOMAIN, SERVICE_VOLUME_DOWN, data, blocking=True)
@bind_hass @bind_hass
def volume_down(hass, entity_id=ENTITY_MATCH_ALL): def volume_down(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for volume down.""" """Send the media player the command for volume down."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} hass.add_job(async_volume_down, hass, entity_id)
hass.services.call(DOMAIN, SERVICE_VOLUME_DOWN, data)
@bind_hass async def async_mute_volume(hass, mute, entity_id=ENTITY_MATCH_ALL):
def mute_volume(hass, mute, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for muting the volume.""" """Send the media player the command for muting the volume."""
data = {ATTR_MEDIA_VOLUME_MUTED: mute} data = {ATTR_MEDIA_VOLUME_MUTED: mute}
if entity_id: if entity_id:
data[ATTR_ENTITY_ID] = entity_id data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_VOLUME_MUTE, data) await hass.services.async_call(DOMAIN, SERVICE_VOLUME_MUTE, data, blocking=True)
@bind_hass @bind_hass
def set_volume_level(hass, volume, entity_id=ENTITY_MATCH_ALL): def mute_volume(hass, mute, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for muting the volume."""
hass.add_job(async_mute_volume, hass, mute, entity_id)
async def async_set_volume_level(hass, volume, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for setting the volume.""" """Send the media player the command for setting the volume."""
data = {ATTR_MEDIA_VOLUME_LEVEL: volume} data = {ATTR_MEDIA_VOLUME_LEVEL: volume}
if entity_id: if entity_id:
data[ATTR_ENTITY_ID] = entity_id data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_VOLUME_SET, data) await hass.services.async_call(DOMAIN, SERVICE_VOLUME_SET, data, blocking=True)
@bind_hass
def set_volume_level(hass, volume, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for setting the volume."""
hass.add_job(async_set_volume_level, hass, volume, entity_id)
async def async_media_play_pause(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for play/pause."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
await hass.services.async_call(
DOMAIN, SERVICE_MEDIA_PLAY_PAUSE, data, blocking=True
)
@bind_hass @bind_hass
def media_play_pause(hass, entity_id=ENTITY_MATCH_ALL): def media_play_pause(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for play/pause."""
hass.add_job(async_media_play_pause, hass, entity_id)
async def async_media_play(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for play/pause.""" """Send the media player the command for play/pause."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY_PAUSE, data) await hass.services.async_call(DOMAIN, SERVICE_MEDIA_PLAY, data, blocking=True)
@bind_hass @bind_hass
def media_play(hass, entity_id=ENTITY_MATCH_ALL): def media_play(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for play/pause.""" """Send the media player the command for play/pause."""
hass.add_job(async_media_play, hass, entity_id)
async def async_media_pause(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for pause."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PLAY, data) await hass.services.async_call(DOMAIN, SERVICE_MEDIA_PAUSE, data, blocking=True)
@bind_hass @bind_hass
def media_pause(hass, entity_id=ENTITY_MATCH_ALL): def media_pause(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for pause.""" """Send the media player the command for pause."""
hass.add_job(async_media_pause, hass, entity_id)
async def async_media_stop(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for stop."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PAUSE, data) await hass.services.async_call(DOMAIN, SERVICE_MEDIA_STOP, data, blocking=True)
@bind_hass @bind_hass
def media_stop(hass, entity_id=ENTITY_MATCH_ALL): def media_stop(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for stop.""" """Send the media player the command for stop."""
hass.add_job(async_media_stop, hass, entity_id)
async def async_media_next_track(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for next track."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_STOP, data) await hass.services.async_call(
DOMAIN, SERVICE_MEDIA_NEXT_TRACK, data, blocking=True
)
@bind_hass @bind_hass
def media_next_track(hass, entity_id=ENTITY_MATCH_ALL): def media_next_track(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for next track.""" """Send the media player the command for next track."""
hass.add_job(async_media_next_track, hass, entity_id)
async def async_media_previous_track(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for prev track."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_NEXT_TRACK, data) await hass.services.async_call(
DOMAIN, SERVICE_MEDIA_PREVIOUS_TRACK, data, blocking=True
)
@bind_hass @bind_hass
def media_previous_track(hass, entity_id=ENTITY_MATCH_ALL): def media_previous_track(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for prev track.""" """Send the media player the command for prev track."""
hass.add_job(async_media_previous_track, hass, entity_id)
async def async_media_seek(hass, position, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command to seek in current playing media."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
hass.services.call(DOMAIN, SERVICE_MEDIA_PREVIOUS_TRACK, data) data[ATTR_MEDIA_SEEK_POSITION] = position
await hass.services.async_call(DOMAIN, SERVICE_MEDIA_SEEK, data, blocking=True)
@bind_hass @bind_hass
def media_seek(hass, position, entity_id=ENTITY_MATCH_ALL): def media_seek(hass, position, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command to seek in current playing media.""" """Send the media player the command to seek in current playing media."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} hass.add_job(async_media_seek, hass, position, entity_id)
data[ATTR_MEDIA_SEEK_POSITION] = position
hass.services.call(DOMAIN, SERVICE_MEDIA_SEEK, data)
@bind_hass async def async_play_media(
def play_media(hass, media_type, media_id, entity_id=ENTITY_MATCH_ALL, enqueue=None): hass, media_type, media_id, entity_id=ENTITY_MATCH_ALL, enqueue=None
):
"""Send the media player the command for playing media.""" """Send the media player the command for playing media."""
data = {ATTR_MEDIA_CONTENT_TYPE: media_type, ATTR_MEDIA_CONTENT_ID: media_id} data = {ATTR_MEDIA_CONTENT_TYPE: media_type, ATTR_MEDIA_CONTENT_ID: media_id}
@ -155,22 +232,38 @@ def play_media(hass, media_type, media_id, entity_id=ENTITY_MATCH_ALL, enqueue=N
if enqueue: if enqueue:
data[ATTR_MEDIA_ENQUEUE] = enqueue data[ATTR_MEDIA_ENQUEUE] = enqueue
hass.services.call(DOMAIN, SERVICE_PLAY_MEDIA, data) await hass.services.async_call(DOMAIN, SERVICE_PLAY_MEDIA, data, blocking=True)
@bind_hass @bind_hass
def select_source(hass, source, entity_id=ENTITY_MATCH_ALL): def play_media(hass, media_type, media_id, entity_id=ENTITY_MATCH_ALL, enqueue=None):
"""Send the media player the command for playing media."""
hass.add_job(async_play_media, hass, media_type, media_id, entity_id, enqueue)
async def async_select_source(hass, source, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command to select input source.""" """Send the media player the command to select input source."""
data = {ATTR_INPUT_SOURCE: source} data = {ATTR_INPUT_SOURCE: source}
if entity_id: if entity_id:
data[ATTR_ENTITY_ID] = entity_id data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_SELECT_SOURCE, data) await hass.services.async_call(DOMAIN, SERVICE_SELECT_SOURCE, data, blocking=True)
@bind_hass
def select_source(hass, source, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command to select input source."""
hass.add_job(async_select_source, hass, source, entity_id)
async def async_clear_playlist(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for clear playlist."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {}
await hass.services.async_call(DOMAIN, SERVICE_CLEAR_PLAYLIST, data, blocking=True)
@bind_hass @bind_hass
def clear_playlist(hass, entity_id=ENTITY_MATCH_ALL): def clear_playlist(hass, entity_id=ENTITY_MATCH_ALL):
"""Send the media player the command for clear playlist.""" """Send the media player the command for clear playlist."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else {} hass.add_job(async_clear_playlist, hass, entity_id)
hass.services.call(DOMAIN, SERVICE_CLEAR_PLAYLIST, data)

View File

@ -95,7 +95,7 @@ class TestShellCommand(unittest.TestCase):
self.hass.block_till_done() self.hass.block_till_done()
cmd = mock_call.mock_calls[0][1][0] cmd = mock_call.mock_calls[0][1][0]
assert 1 == mock_call.call_count assert mock_call.call_count == 1
assert "ls /bin" == cmd assert "ls /bin" == cmd
@patch( @patch(
@ -121,7 +121,7 @@ class TestShellCommand(unittest.TestCase):
self.hass.block_till_done() self.hass.block_till_done()
cmd = mock_call.mock_calls[0][1] cmd = mock_call.mock_calls[0][1]
assert 1 == mock_call.call_count assert mock_call.call_count == 1
assert ("ls", "/bin", "Works") == cmd assert ("ls", "/bin", "Works") == cmd
@patch( @patch(
@ -143,8 +143,8 @@ class TestShellCommand(unittest.TestCase):
self.hass.services.call("shell_command", "test_service", blocking=True) self.hass.services.call("shell_command", "test_service", blocking=True)
self.hass.block_till_done() self.hass.block_till_done()
assert 1 == mock_call.call_count assert mock_call.call_count == 1
assert 1 == mock_error.call_count assert mock_error.call_count == 1
assert not os.path.isfile(path) assert not os.path.isfile(path)
@patch("homeassistant.components.shell_command._LOGGER.debug") @patch("homeassistant.components.shell_command._LOGGER.debug")
@ -160,7 +160,7 @@ class TestShellCommand(unittest.TestCase):
self.hass.services.call("shell_command", "test_service", blocking=True) self.hass.services.call("shell_command", "test_service", blocking=True)
self.hass.block_till_done() self.hass.block_till_done()
assert 1 == mock_output.call_count assert mock_output.call_count == 1
assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1] assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1]
@patch("homeassistant.components.shell_command._LOGGER.debug") @patch("homeassistant.components.shell_command._LOGGER.debug")
@ -176,5 +176,5 @@ class TestShellCommand(unittest.TestCase):
self.hass.services.call("shell_command", "test_service", blocking=True) self.hass.services.call("shell_command", "test_service", blocking=True)
self.hass.block_till_done() self.hass.block_till_done()
assert 1 == mock_output.call_count assert mock_output.call_count == 1
assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1] assert test_phrase.encode() + b"\n" == mock_output.call_args_list[0][0][-1]

View File

@ -1,41 +1,26 @@
"""The test for the version sensor platform.""" """The test for the version sensor platform."""
import asyncio
import unittest
from unittest.mock import patch from unittest.mock import patch
from homeassistant.setup import setup_component from homeassistant.setup import async_setup_component
from tests.common import get_test_home_assistant
MOCK_VERSION = "10.0" MOCK_VERSION = "10.0"
class TestVersionSensor(unittest.TestCase): async def test_version_sensor(hass):
"""Test the Version sensor.""" """Test the Version sensor."""
config = {"sensor": {"platform": "version"}}
def setup_method(self, method): assert await async_setup_component(hass, "sensor", config)
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
def teardown_method(self, method):
"""Stop everything that was started."""
self.hass.stop()
def test_version_sensor(self): async def test_version(hass):
"""Test the Version sensor.""" """Test the Version sensor."""
config = {"sensor": {"platform": "version"}} config = {"sensor": {"platform": "version", "name": "test"}}
assert setup_component(self.hass, "sensor", config) with patch("homeassistant.const.__version__", MOCK_VERSION):
assert await async_setup_component(hass, "sensor", config)
await hass.async_block_till_done()
@asyncio.coroutine state = hass.states.get("sensor.test")
def test_version(self):
"""Test the Version sensor."""
config = {"sensor": {"platform": "version", "name": "test"}}
with patch("homeassistant.const.__version__", MOCK_VERSION): assert state.state == "10.0"
assert setup_component(self.hass, "sensor", config)
self.hass.block_till_done()
state = self.hass.states.get("sensor.test")
assert state.state == "10.0"