Add ability to browse (and play) HEOS media (#140433)

* Add browse and play

* Tests

* Add tests involving media source
This commit is contained in:
Andrew Sayre 2025-03-17 14:10:56 -05:00 committed by GitHub
parent 4dfb56a2f7
commit 52d86ede3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 602 additions and 10 deletions

View File

@ -3,27 +3,35 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Coroutine, Sequence
from contextlib import suppress
from datetime import datetime
from functools import reduce, wraps
import logging
from operator import ior
from typing import Any
from typing import Any, Final
from pyheos import (
AddCriteriaType,
ControlType,
HeosError,
HeosPlayer,
MediaItem,
MediaMusicSource,
MediaType as HeosMediaType,
PlayState,
RepeatType,
const as heos_const,
)
from pyheos.util import mediauri as heos_source
import voluptuous as vol
from homeassistant.components import media_source
from homeassistant.components.media_player import (
ATTR_MEDIA_ENQUEUE,
ATTR_MEDIA_VOLUME_LEVEL,
BrowseError,
BrowseMedia,
MediaClass,
MediaPlayerEnqueue,
MediaPlayerEntity,
MediaPlayerEntityFeature,
@ -32,6 +40,7 @@ from homeassistant.components.media_player import (
RepeatMode,
async_process_play_media_url,
)
from homeassistant.components.media_source import BrowseMediaSource
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
@ -55,6 +64,8 @@ from .coordinator import HeosConfigEntry, HeosCoordinator
PARALLEL_UPDATES = 0
BROWSE_ROOT: Final = "heos://media"
BASE_SUPPORTED_FEATURES = (
MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.VOLUME_SET
@ -97,6 +108,21 @@ HEOS_HA_REPEAT_TYPE_MAP = {
}
HA_HEOS_REPEAT_TYPE_MAP = {v: k for k, v in HEOS_HA_REPEAT_TYPE_MAP.items()}
HEOS_MEDIA_TYPE_TO_MEDIA_CLASS = {
HeosMediaType.ALBUM: MediaClass.ALBUM,
HeosMediaType.ARTIST: MediaClass.ARTIST,
HeosMediaType.CONTAINER: MediaClass.DIRECTORY,
HeosMediaType.GENRE: MediaClass.GENRE,
HeosMediaType.HEOS_SERVER: MediaClass.DIRECTORY,
HeosMediaType.HEOS_SERVICE: MediaClass.DIRECTORY,
HeosMediaType.MUSIC_SERVICE: MediaClass.DIRECTORY,
HeosMediaType.PLAYLIST: MediaClass.PLAYLIST,
HeosMediaType.SONG: MediaClass.TRACK,
HeosMediaType.STATION: MediaClass.TRACK,
}
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
@ -282,6 +308,16 @@ class HeosMediaPlayer(CoordinatorEntity[HeosCoordinator], MediaPlayerEntity):
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a piece of media."""
if heos_source.is_media_uri(media_id):
media, data = heos_source.from_media_uri(media_id)
if not isinstance(media, MediaItem):
raise ValueError(f"Invalid media id '{media_id}'")
await self._player.play_media(
media,
HA_HEOS_ENQUEUE_MAP[kwargs.get(ATTR_MEDIA_ENQUEUE)],
)
return
if media_source.is_media_source_id(media_id):
media_type = MediaType.URL
play_item = await media_source.async_resolve_media(
@ -534,14 +570,101 @@ class HeosMediaPlayer(CoordinatorEntity[HeosCoordinator], MediaPlayerEntity):
"""Volume level of the media player (0..1)."""
return self._player.volume / 100
async def _async_browse_media_root(self) -> BrowseMedia:
"""Return media browsing root."""
if not self.coordinator.heos.music_sources:
try:
await self.coordinator.heos.get_music_sources()
except HeosError as error:
_LOGGER.debug("Unable to load music sources: %s", error)
children: list[BrowseMedia] = [
_media_to_browse_media(source)
for source in self.coordinator.heos.music_sources.values()
if source.available
]
root = BrowseMedia(
title="Music Sources",
media_class=MediaClass.DIRECTORY,
children_media_class=MediaClass.DIRECTORY,
media_content_type="",
media_content_id=BROWSE_ROOT,
can_expand=True,
can_play=False,
children=children,
)
# Append media source items
with suppress(BrowseError):
browse = await self._async_browse_media_source()
# If domain is None, it's an overview of available sources
if browse.domain is None and browse.children:
children.extend(browse.children)
else:
children.append(browse)
return root
async def _async_browse_heos_media(self, media_content_id: str) -> BrowseMedia:
"""Browse a HEOS media item."""
media, data = heos_source.from_media_uri(media_content_id)
browse_media = _media_to_browse_media(media)
try:
browse_result = await self.coordinator.heos.browse_media(media)
except HeosError as error:
_LOGGER.debug("Unable to browse media %s: %s", media, error)
else:
browse_media.children = [
_media_to_browse_media(item)
for item in browse_result.items
if item.browsable or item.playable
]
return browse_media
async def _async_browse_media_source(
self, media_content_id: str | None = None
) -> BrowseMediaSource:
"""Browse a media source item."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper."""
return await media_source.async_browse_media(
self.hass,
media_content_id,
content_filter=lambda item: item.media_content_type.startswith("audio/"),
if media_content_id in (None, BROWSE_ROOT):
return await self._async_browse_media_root()
assert media_content_id is not None
if heos_source.is_media_uri(media_content_id):
return await self._async_browse_heos_media(media_content_id)
if media_source.is_media_source_id(media_content_id):
return await self._async_browse_media_source(media_content_id)
raise ServiceValidationError(
translation_domain=HEOS_DOMAIN,
translation_key="unsupported_media_content_id",
translation_placeholders={"media_content_id": media_content_id},
)
def _media_to_browse_media(media: MediaItem | MediaMusicSource) -> BrowseMedia:
"""Convert a HEOS media item to a browse media item."""
can_expand = False
can_play = False
if isinstance(media, MediaMusicSource):
can_expand = media.available
else:
can_expand = media.browsable
can_play = media.playable
return BrowseMedia(
can_expand=can_expand,
can_play=can_play,
media_content_id=heos_source.to_media_uri(media),
media_content_type="",
media_class=HEOS_MEDIA_TYPE_TO_MEDIA_CLASS[media.type],
title=media.name,
thumbnail=media.image_url,
)

View File

@ -146,6 +146,9 @@
},
"unknown_source": {
"message": "Unknown source: {source}"
},
"unsupported_media_content_id": {
"message": "Unsupported media_content_id: {media_content_id}"
}
},
"issues": {

View File

@ -2,7 +2,14 @@
from unittest.mock import AsyncMock
from pyheos import ConnectionState, Heos, HeosGroup, HeosOptions, HeosPlayer
from pyheos import (
ConnectionState,
Heos,
HeosGroup,
HeosOptions,
HeosPlayer,
MediaMusicSource,
)
class MockHeos(Heos):
@ -13,6 +20,7 @@ class MockHeos(Heos):
super().__init__(options)
# Overwrite the methods with async mocks, changing type
self.add_to_queue: AsyncMock = AsyncMock()
self.browse_media: AsyncMock = AsyncMock()
self.connect: AsyncMock = AsyncMock()
self.disconnect: AsyncMock = AsyncMock()
self.get_favorites: AsyncMock = AsyncMock()
@ -20,6 +28,7 @@ class MockHeos(Heos):
self.get_input_sources: AsyncMock = AsyncMock()
self.get_playlists: AsyncMock = AsyncMock()
self.get_players: AsyncMock = AsyncMock()
self.get_music_sources: AsyncMock = AsyncMock()
self.group_volume_down: AsyncMock = AsyncMock()
self.group_volume_up: AsyncMock = AsyncMock()
self.get_system_info: AsyncMock = AsyncMock()
@ -68,3 +77,13 @@ class MockHeos(Heos):
def mock_set_current_host(self, host: str) -> None:
"""Set the current host on the mock instance."""
self._connection._host = host
def mock_set_music_sources(
self, music_sources: dict[int, MediaMusicSource]
) -> None:
"""Set the music sources on the mock instance."""
for music_source in music_sources.values():
music_source.heos = self
self._music_sources = music_sources
self._music_sources_loaded = bool(music_sources)
self.get_music_sources.return_value = music_sources

View File

@ -6,6 +6,7 @@ from collections.abc import Callable, Iterator
from unittest.mock import Mock, patch
from pyheos import (
BrowseResult,
HeosGroup,
HeosHost,
HeosNowPlayingMedia,
@ -14,6 +15,7 @@ from pyheos import (
HeosSystem,
LineOutLevelType,
MediaItem,
MediaMusicSource,
MediaType,
NetworkType,
PlayerUpdateResult,
@ -294,10 +296,10 @@ def quick_selects_fixture() -> dict[int, str]:
}
@pytest.fixture(name="playlists")
def playlists_fixture() -> list[MediaItem]:
"""Create favorites fixture."""
playlist = MediaItem(
@pytest.fixture(name="playlist")
def playlist_fixture() -> MediaItem:
"""Create playlist fixture."""
return MediaItem(
source_id=const.MUSIC_SOURCE_PLAYLISTS,
name="Awesome Music",
type=MediaType.PLAYLIST,
@ -306,6 +308,44 @@ def playlists_fixture() -> list[MediaItem]:
image_url="",
heos=None,
)
@pytest.fixture(name="music_sources")
def music_sources_fixture() -> dict[int, MediaMusicSource]:
"""Create music sources fixture."""
return {
const.MUSIC_SOURCE_PANDORA: MediaMusicSource(
source_id=const.MUSIC_SOURCE_PANDORA,
name="Pandora",
type=MediaType.MUSIC_SERVICE,
available=True,
service_username="user",
image_url="",
heos=None,
),
const.MUSIC_SOURCE_TUNEIN: MediaMusicSource(
source_id=const.MUSIC_SOURCE_TUNEIN,
name="TuneIn",
type=MediaType.MUSIC_SERVICE,
available=False,
service_username=None,
image_url="",
heos=None,
),
}
@pytest.fixture(name="pandora_browse_result")
def pandora_browse_response_fixture(favorites: dict[int, MediaItem]) -> BrowseResult:
"""Create a mock response for browsing Pandora."""
return BrowseResult(
1, 1, const.MUSIC_SOURCE_PANDORA, items=[favorites[1]], options=[]
)
@pytest.fixture(name="playlists")
def playlists_fixture(playlist: MediaItem) -> list[MediaItem]:
"""Create playlists fixture."""
return [playlist]

View File

@ -1,4 +1,144 @@
# serializer version: 1
# name: test_browse_media_heos_media
dict({
'can_expand': True,
'can_play': False,
'children': list([
dict({
'can_expand': False,
'can_play': True,
'children_media_class': None,
'media_class': 'track',
'media_content_id': 'heos://media/1/station?name=Today%27s+Hits+Radio&image_url=&playable=True&browsable=False&media_id=123456789',
'media_content_type': '',
'thumbnail': '',
'title': "Today's Hits Radio",
}),
]),
'children_media_class': 'track',
'media_class': 'directory',
'media_content_id': 'heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user',
'media_content_type': '',
'not_shown': 0,
'thumbnail': '',
'title': 'Pandora',
})
# ---
# name: test_browse_media_heos_media_error_returns_empty
dict({
'can_expand': True,
'can_play': False,
'children': list([
]),
'children_media_class': None,
'media_class': 'directory',
'media_content_id': 'heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user',
'media_content_type': '',
'not_shown': 0,
'thumbnail': '',
'title': 'Pandora',
})
# ---
# name: test_browse_media_media_source
dict({
'can_expand': True,
'can_play': False,
'children': list([
dict({
'can_expand': False,
'can_play': True,
'children_media_class': None,
'media_class': 'music',
'media_content_id': 'media-source://media_source/local/test.mp3',
'media_content_type': 'audio/mpeg',
'thumbnail': None,
'title': 'test.mp3',
}),
]),
'children_media_class': 'music',
'media_class': 'directory',
'media_content_id': 'media-source://media_source/local/.',
'media_content_type': '',
'not_shown': 1,
'thumbnail': None,
'title': 'media',
})
# ---
# name: test_browse_media_root
dict({
'can_expand': True,
'can_play': False,
'children': list([
dict({
'can_expand': True,
'can_play': False,
'children_media_class': None,
'media_class': 'directory',
'media_content_id': 'heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user',
'media_content_type': '',
'thumbnail': '',
'title': 'Pandora',
}),
dict({
'can_expand': True,
'can_play': False,
'children_media_class': 'music',
'media_class': 'directory',
'media_content_id': 'media-source://media_source/local/.',
'media_content_type': '',
'thumbnail': None,
'title': 'media',
}),
]),
'children_media_class': 'directory',
'media_class': 'directory',
'media_content_id': 'heos://media',
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'Music Sources',
})
# ---
# name: test_browse_media_root_no_media_source
dict({
'can_expand': True,
'can_play': False,
'children': list([
dict({
'can_expand': True,
'can_play': False,
'children_media_class': None,
'media_class': 'directory',
'media_content_id': 'heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user',
'media_content_type': '',
'thumbnail': '',
'title': 'Pandora',
}),
]),
'children_media_class': 'directory',
'media_class': 'directory',
'media_content_id': 'heos://media',
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'Music Sources',
})
# ---
# name: test_browse_media_root_source_error_continues
dict({
'can_expand': True,
'can_play': False,
'children': list([
]),
'children_media_class': 'directory',
'media_class': 'directory',
'media_content_id': 'heos://media',
'media_content_type': '',
'not_shown': 0,
'thumbnail': None,
'title': 'Music Sources',
})
# ---
# name: test_state_attributes
StateSnapshot({
'attributes': ReadOnlyDict({

View File

@ -7,9 +7,11 @@ from typing import Any
from freezegun.api import FrozenDateTimeFactory
from pyheos import (
AddCriteriaType,
BrowseResult,
CommandFailedError,
HeosError,
MediaItem,
MediaMusicSource,
MediaType as HeosMediaType,
PlayerUpdateResult,
PlayState,
@ -18,6 +20,7 @@ from pyheos import (
SignalType,
const,
)
from pyheos.util import mediauri
import pytest
from syrupy.assertion import SnapshotAssertion
from syrupy.filters import props
@ -51,6 +54,7 @@ from homeassistant.components.media_player import (
MediaType,
RepeatMode,
)
from homeassistant.components.media_source import DOMAIN as MS_DOMAIN
from homeassistant.const import (
ATTR_ENTITY_ID,
SERVICE_MEDIA_NEXT_TRACK,
@ -73,6 +77,8 @@ from homeassistant.helpers import device_registry as dr, entity_registry as er
from . import MockHeos
from tests.common import MockConfigEntry, async_fire_time_changed
from tests.conftest import async_setup_component
from tests.typing import WebSocketGenerator
async def test_state_attributes(
@ -1239,6 +1245,267 @@ async def test_play_media_invalid_type(
)
async def test_play_media_media_uri(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
playlist: MediaItem,
) -> None:
"""Test the play media service with HEOS media uri."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
media_content_id = mediauri.to_media_uri(playlist)
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: media_content_id,
ATTR_MEDIA_CONTENT_TYPE: "",
},
blocking=True,
)
controller.play_media.assert_called_once()
async def test_play_media_media_uri_invalid(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
) -> None:
"""Test the play media service with an invalid HEOS media uri raises."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
media_id = "heos://media/1/music_service?name=Pandora&available=False&image_url="
with pytest.raises(
HomeAssistantError,
match=re.escape(f"Unable to play media: Invalid media id '{media_id}'"),
):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: media_id,
ATTR_MEDIA_CONTENT_TYPE: "",
},
blocking=True,
)
controller.play_media.assert_not_called()
async def test_play_media_music_source_url(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
) -> None:
"""Test the play media service with a music source url."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await async_setup_component(hass, MS_DOMAIN, {MS_DOMAIN: {}})
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_player",
ATTR_MEDIA_CONTENT_ID: "media-source://media_source/local/test.mp3",
ATTR_MEDIA_CONTENT_TYPE: "",
},
blocking=True,
)
controller.play_url.assert_called_once()
async def test_browse_media_root(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
music_sources: dict[int, MediaMusicSource],
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing the root."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await async_setup_component(hass, MS_DOMAIN, {MS_DOMAIN: {}})
controller.mock_set_music_sources(music_sources)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
async def test_browse_media_root_no_media_source(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
music_sources: dict[int, MediaMusicSource],
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing the root."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
controller.mock_set_music_sources(music_sources)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
async def test_browse_media_root_source_error_continues(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing the root with an error getting sources continues."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
controller.get_music_sources.side_effect = HeosError("error")
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
assert "Unable to load music sources" in caplog.text
async def test_browse_media_heos_media(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
hass_ws_client: WebSocketGenerator,
pandora_browse_result: BrowseResult,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing a heos media item."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
controller.browse_media.return_value = pandora_browse_result
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user",
"media_content_type": "",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
async def test_browse_media_heos_media_error_returns_empty(
hass: HomeAssistant,
config_entry: MockConfigEntry,
controller: MockHeos,
hass_ws_client: WebSocketGenerator,
caplog: pytest.LogCaptureFixture,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing a heos media item results in an error, returns empty children."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
controller.browse_media.side_effect = HeosError("error")
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "heos://media/1/music_service?name=Pandora&image_url=&available=True&service_username=user",
"media_content_type": "",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
assert "Unable to browse media" in caplog.text
async def test_browse_media_media_source(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
snapshot: SnapshotAssertion,
) -> None:
"""Test browsing a media source."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await async_setup_component(hass, MS_DOMAIN, {MS_DOMAIN: {}})
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "media-source://media_source/local/.",
"media_content_type": "",
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == snapshot
async def test_browse_media_invalid_content_id(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test browsing an invalid content id fails."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": "media_player.test_player",
"media_content_id": "invalid",
"media_content_type": "",
}
)
response = await client.receive_json()
assert not response["success"]
@pytest.mark.parametrize(
("members", "expected"),
[