Extend play_media support (#23580)

This commit is contained in:
Andrew Sayre 2019-05-02 19:54:36 -05:00 committed by GitHub
parent 6130831a43
commit b30afde8ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 189 additions and 14 deletions

View File

@ -7,10 +7,11 @@ from typing import Sequence
from homeassistant.components.media_player import MediaPlayerDevice
from homeassistant.components.media_player.const import (
DOMAIN, MEDIA_TYPE_MUSIC, MEDIA_TYPE_URL, SUPPORT_CLEAR_PLAYLIST,
SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY, SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK, SUPPORT_SELECT_SOURCE, SUPPORT_SHUFFLE_SET,
SUPPORT_STOP, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_STEP)
ATTR_MEDIA_ENQUEUE, DOMAIN, MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_URL, SUPPORT_CLEAR_PLAYLIST, SUPPORT_NEXT_TRACK, SUPPORT_PAUSE,
SUPPORT_PLAY, SUPPORT_PLAY_MEDIA, SUPPORT_PREVIOUS_TRACK,
SUPPORT_SELECT_SOURCE, SUPPORT_SHUFFLE_SET, SUPPORT_STOP,
SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_STEP)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_IDLE, STATE_PAUSED, STATE_PLAYING
from homeassistant.helpers.typing import HomeAssistantType
@ -49,7 +50,8 @@ def log_command_error(command: str):
from pyheos import CommandError
try:
await func(*args, **kwargs)
except (CommandError, asyncio.TimeoutError, ConnectionError) as ex:
except (CommandError, asyncio.TimeoutError, ConnectionError,
ValueError) as ex:
_LOGGER.error("Unable to %s: %s", command, ex)
return wrapper
return decorator
@ -167,9 +169,49 @@ class HeosMediaPlayer(MediaPlayerDevice):
"""Play a piece of media."""
if media_type == MEDIA_TYPE_URL:
await self._player.play_url(media_id)
else:
_LOGGER.error("Unable to play media: Unsupported media type '%s'",
media_type)
return
if media_type == "quick_select":
# media_id may be an int or a str
selects = await self._player.get_quick_selects()
try:
index = int(media_id)
except ValueError:
# Try finding index by name
index = next((index for index, select in selects.items()
if select == media_id), None)
if index is None:
raise ValueError("Invalid quick select '{}'".format(media_id))
await self._player.play_quick_select(index)
return
if media_type == MEDIA_TYPE_PLAYLIST:
from pyheos import const
playlists = await self._player.heos.get_playlists()
playlist = next((p for p in playlists if p.name == media_id), None)
if not playlist:
raise ValueError("Invalid playlist '{}'".format(media_id))
add_queue_option = const.ADD_QUEUE_ADD_TO_END \
if kwargs.get(ATTR_MEDIA_ENQUEUE) \
else const.ADD_QUEUE_REPLACE_AND_PLAY
await self._player.add_to_queue(playlist, add_queue_option)
return
if media_type == "favorite":
# media_id may be an int or str
try:
index = int(media_id)
except ValueError:
# Try finding index by name
index = next((index for index, favorite
in self._source_manager.favorites.items()
if favorite.name == media_id), None)
if index is None:
raise ValueError("Invalid favorite '{}'".format(media_id))
await self._player.play_favorite(index)
return
raise ValueError("Unsupported media type '{}'".format(media_type))
@log_command_error("select source")
async def async_select_source(self, source):

View File

@ -19,15 +19,19 @@ def config_entry_fixture():
@pytest.fixture(name="controller")
def controller_fixture(players, favorites, input_sources, dispatcher):
def controller_fixture(
players, favorites, input_sources, playlists, dispatcher):
"""Create a mock Heos controller fixture."""
with patch("pyheos.Heos", autospec=True) as mock:
mock_heos = mock.return_value
for player in players.values():
player.heos = mock_heos
mock_heos.dispatcher = dispatcher
mock_heos.get_players.return_value = players
mock_heos.players = players
mock_heos.get_favorites.return_value = favorites
mock_heos.get_input_sources.return_value = input_sources
mock_heos.get_playlists.return_value = playlists
mock_heos.is_signed_in = True
mock_heos.signed_in_username = "user@user.com"
yield mock_heos
@ -42,10 +46,9 @@ def config_fixture():
@pytest.fixture(name="players")
def player_fixture(dispatcher):
def player_fixture(quick_selects):
"""Create a mock HeosPlayer."""
player = Mock(HeosPlayer)
player.heos.dispatcher = dispatcher
player.player_id = 1
player.name = "Test Player"
player.model = "Test Model"
@ -71,6 +74,7 @@ def player_fixture(dispatcher):
player.now_playing_media.current_position = None
player.now_playing_media.image_url = "http://"
player.now_playing_media.song = "Song"
player.get_quick_selects.return_value = quick_selects
return {player.player_id: player}
@ -123,3 +127,25 @@ def discovery_data_fixture() -> dict:
'udn': 'uuid:e61de70c-2250-1c22-0080-0005cdf512be',
'upnp_device_type': 'urn:schemas-denon-com:device:AiosDevice:1'
}
@pytest.fixture(name="quick_selects")
def quick_selects_fixture() -> Dict[int, str]:
"""Create a dict of quick selects for testing."""
return {
1: "Quick Select 1",
2: "Quick Select 2",
3: "Quick Select 3",
4: "Quick Select 4",
5: "Quick Select 5",
6: "Quick Select 6"
}
@pytest.fixture(name="playlists")
def playlists_fixture() -> Sequence[HeosSource]:
"""Create favorites fixture."""
playlist = Mock(HeosSource)
playlist.type = const.TYPE_PLAYLIST
playlist.name = "Awesome Music"
return [playlist]

View File

@ -9,9 +9,10 @@ from homeassistant.components.heos.const import (
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE, ATTR_INPUT_SOURCE_LIST, ATTR_MEDIA_ALBUM_NAME,
ATTR_MEDIA_ARTIST, ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE,
ATTR_MEDIA_DURATION, ATTR_MEDIA_POSITION, ATTR_MEDIA_POSITION_UPDATED_AT,
ATTR_MEDIA_SHUFFLE, ATTR_MEDIA_TITLE, ATTR_MEDIA_VOLUME_LEVEL,
ATTR_MEDIA_VOLUME_MUTED, DOMAIN as MEDIA_PLAYER_DOMAIN, MEDIA_TYPE_MUSIC,
ATTR_MEDIA_DURATION, ATTR_MEDIA_ENQUEUE, ATTR_MEDIA_POSITION,
ATTR_MEDIA_POSITION_UPDATED_AT, ATTR_MEDIA_SHUFFLE, ATTR_MEDIA_TITLE,
ATTR_MEDIA_VOLUME_LEVEL, ATTR_MEDIA_VOLUME_MUTED,
DOMAIN as MEDIA_PLAYER_DOMAIN, MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_URL, SERVICE_CLEAR_PLAYLIST, SERVICE_PLAY_MEDIA,
SERVICE_SELECT_SOURCE, SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY,
SUPPORT_PREVIOUS_TRACK, SUPPORT_STOP)
@ -463,6 +464,112 @@ async def test_play_media_url(hass, config_entry, config, controller, caplog):
assert "Unable to play media: Failure (1)" in caplog.text
async def test_play_media_quick_select(
hass, config_entry, config, controller, caplog, quick_selects):
"""Test the play media service with type quick_select."""
await setup_platform(hass, config_entry, config)
player = controller.players[1]
quick_select = list(quick_selects.items())[0]
index = quick_select[0]
name = quick_select[1]
# Play by index
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'quick_select',
ATTR_MEDIA_CONTENT_ID: str(index)}, blocking=True)
player.play_quick_select.assert_called_once_with(index)
# Play by name
player.play_quick_select.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'quick_select',
ATTR_MEDIA_CONTENT_ID: name}, blocking=True)
player.play_quick_select.assert_called_once_with(index)
# Invalid name
player.play_quick_select.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'quick_select',
ATTR_MEDIA_CONTENT_ID: "Invalid"}, blocking=True)
assert player.play_quick_select.call_count == 0
assert "Unable to play media: Invalid quick select 'Invalid'" \
in caplog.text
async def test_play_media_playlist(
hass, config_entry, config, controller, caplog, playlists):
"""Test the play media service with type playlist."""
await setup_platform(hass, config_entry, config)
player = controller.players[1]
playlist = playlists[0]
# Play without enqueing
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_PLAYLIST,
ATTR_MEDIA_CONTENT_ID: playlist.name}, blocking=True)
player.add_to_queue.assert_called_once_with(
playlist, const.ADD_QUEUE_REPLACE_AND_PLAY)
# Play with enqueing
player.add_to_queue.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_PLAYLIST,
ATTR_MEDIA_CONTENT_ID: playlist.name,
ATTR_MEDIA_ENQUEUE: True}, blocking=True)
player.add_to_queue.assert_called_once_with(
playlist, const.ADD_QUEUE_ADD_TO_END)
# Invalid name
player.add_to_queue.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_PLAYLIST,
ATTR_MEDIA_CONTENT_ID: 'Invalid'}, blocking=True)
assert player.add_to_queue.call_count == 0
assert "Unable to play media: Invalid playlist 'Invalid'" \
in caplog.text
async def test_play_media_favorite(
hass, config_entry, config, controller, caplog, favorites):
"""Test the play media service with type favorite."""
await setup_platform(hass, config_entry, config)
player = controller.players[1]
quick_select = list(favorites.items())[0]
index = quick_select[0]
name = quick_select[1].name
# Play by index
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'favorite',
ATTR_MEDIA_CONTENT_ID: str(index)}, blocking=True)
player.play_favorite.assert_called_once_with(index)
# Play by name
player.play_favorite.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'favorite',
ATTR_MEDIA_CONTENT_ID: name}, blocking=True)
player.play_favorite.assert_called_once_with(index)
# Invalid name
player.play_favorite.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_PLAY_MEDIA,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_CONTENT_TYPE: 'favorite',
ATTR_MEDIA_CONTENT_ID: "Invalid"}, blocking=True)
assert player.play_favorite.call_count == 0
assert "Unable to play media: Invalid favorite 'Invalid'" \
in caplog.text
async def test_play_media_invalid_type(
hass, config_entry, config, controller, caplog):
"""Test the play media service with an invalid type."""