Add mediabrowser search to music assistant (#143851)

* add search to music assistant

* fix: copy paste error

* refactor: remove unnecessary hasattr condition checks

* refactor: clean up type hinting for mypy
This commit is contained in:
Jozef Kruszynski 2025-04-30 19:40:41 +02:00 committed by GitHub
parent 5ccb9486e0
commit 30656a4e72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 495 additions and 15 deletions

View File

@ -2,9 +2,15 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import logging
from typing import TYPE_CHECKING, Any, cast
from music_assistant_models.media_items import MediaItemType
from music_assistant_models.enums import MediaType as MASSMediaType
from music_assistant_models.media_items import (
BrowseFolder,
MediaItemType,
SearchResults,
)
from homeassistant.components import media_source
from homeassistant.components.media_player import (
@ -12,6 +18,9 @@ from homeassistant.components.media_player import (
BrowseMedia,
MediaClass,
MediaType,
SearchError,
SearchMedia,
SearchMediaQuery,
)
from homeassistant.core import HomeAssistant
@ -20,17 +29,17 @@ from .const import DEFAULT_NAME, DOMAIN
if TYPE_CHECKING:
from music_assistant_client import MusicAssistantClient
MEDIA_TYPE_RADIO = "radio"
MEDIA_TYPE_PODCAST_EPISODE = "podcast_episode"
MEDIA_TYPE_AUDIOBOOK = "audiobook"
MEDIA_TYPE_RADIO = "radio"
PLAYABLE_MEDIA_TYPES = [
MediaType.PLAYLIST,
MediaType.ALBUM,
MediaType.ARTIST,
MEDIA_TYPE_AUDIOBOOK,
MediaType.PLAYLIST,
MediaType.PODCAST,
MEDIA_TYPE_RADIO,
MediaType.PODCAST,
MEDIA_TYPE_AUDIOBOOK,
MediaType.TRACK,
]
@ -66,6 +75,7 @@ LIBRARY_MEDIA_CLASS_MAP = {
MEDIA_CONTENT_TYPE_FLAC = "audio/flac"
THUMB_SIZE = 200
SORT_NAME_DESC = "sort_name_desc"
LOGGER = logging.getLogger(__name__)
def media_source_filter(item: BrowseMedia) -> bool:
@ -418,3 +428,205 @@ def build_item(
can_expand=can_expand,
thumbnail=img_url,
)
async def _search_within_album(
mass: MusicAssistantClient, album_uri: str, search_query: str, limit: int
) -> SearchMedia:
"""Search for tracks within a specific album."""
album = await mass.music.get_item_by_uri(album_uri)
tracks = await mass.music.get_album_tracks(album.item_id, album.provider)
# Filter tracks by search query
filtered_tracks = [
track
for track in tracks
if search_query.lower() in track.name.lower() and track.available
]
return SearchMedia(
result=[
build_item(mass, track, can_expand=False)
for track in filtered_tracks[:limit]
]
)
async def _search_within_artist(
mass: MusicAssistantClient, artist_uri: str, search_query: str, limit: int
) -> SearchResults:
"""Search for content within an artist's catalog."""
artist = await mass.music.get_item_by_uri(artist_uri)
search_query = f"{artist.name} - {search_query}"
return await mass.music.search(
search_query,
media_types=[MASSMediaType.ALBUM, MASSMediaType.TRACK],
limit=limit,
)
def _get_media_types_from_query(query: SearchMediaQuery) -> list[MASSMediaType]:
"""Map query to Music Assistant media types."""
media_types: list[MASSMediaType] = []
match query.media_content_type:
case MediaType.ARTIST:
media_types = [MASSMediaType.ARTIST]
case MediaType.ALBUM:
media_types = [MASSMediaType.ALBUM]
case MediaType.TRACK:
media_types = [MASSMediaType.TRACK]
case MediaType.PLAYLIST:
media_types = [MASSMediaType.PLAYLIST]
case "radio":
media_types = [MASSMediaType.RADIO]
case "audiobook":
media_types = [MASSMediaType.AUDIOBOOK]
case MediaType.PODCAST:
media_types = [MASSMediaType.PODCAST]
case _:
# No specific type selected
if query.media_filter_classes:
# Map MediaClass to search types
mapping = {
MediaClass.ARTIST: MASSMediaType.ARTIST,
MediaClass.ALBUM: MASSMediaType.ALBUM,
MediaClass.TRACK: MASSMediaType.TRACK,
MediaClass.PLAYLIST: MASSMediaType.PLAYLIST,
MediaClass.MUSIC: MASSMediaType.RADIO,
MediaClass.DIRECTORY: MASSMediaType.AUDIOBOOK,
MediaClass.PODCAST: MASSMediaType.PODCAST,
}
media_types = [
mapping[cls] for cls in query.media_filter_classes if cls in mapping
]
# Default to all types if none specified
if not media_types:
media_types = [
MASSMediaType.ARTIST,
MASSMediaType.ALBUM,
MASSMediaType.TRACK,
MASSMediaType.PLAYLIST,
MASSMediaType.RADIO,
MASSMediaType.AUDIOBOOK,
MASSMediaType.PODCAST,
]
return media_types
def _process_search_results(
mass: MusicAssistantClient,
search_results: SearchResults,
media_types: list[MASSMediaType],
) -> list[BrowseMedia]:
"""Process search results into BrowseMedia items."""
result: list[BrowseMedia] = []
# Process search results for each media type
for media_type in media_types:
# Get items for each media type using pattern matching
items: list[MediaItemType] = []
match media_type:
case MASSMediaType.ARTIST if search_results.artists:
# Cast to ensure type safety
items = cast(list[MediaItemType], search_results.artists)
case MASSMediaType.ALBUM if search_results.albums:
items = cast(list[MediaItemType], search_results.albums)
case MASSMediaType.TRACK if search_results.tracks:
items = cast(list[MediaItemType], search_results.tracks)
case MASSMediaType.PLAYLIST if search_results.playlists:
items = cast(list[MediaItemType], search_results.playlists)
case MASSMediaType.RADIO if search_results.radio:
items = cast(list[MediaItemType], search_results.radio)
case MASSMediaType.PODCAST if search_results.podcasts:
items = cast(list[MediaItemType], search_results.podcasts)
case MASSMediaType.AUDIOBOOK if search_results.audiobooks:
items = cast(list[MediaItemType], search_results.audiobooks)
case _:
continue
# Add available items to results
for item in items:
if TYPE_CHECKING:
assert not isinstance(item, BrowseFolder)
if not item.available:
continue
# Create browse item
# Convert to string to get the original value since we're using MASSMediaType enum
str_media_type = media_type.value.lower()
can_expand = _should_expand_media_type(str_media_type)
media_class = _get_media_class_for_type(str_media_type)
browse_item = build_item(
mass,
item,
can_expand=can_expand,
media_class=media_class,
)
result.append(browse_item)
return result
def _should_expand_media_type(media_type: str) -> bool:
"""Determine if a media type should be expandable."""
return media_type in ("artist", "album", "playlist", "podcast")
def _get_media_class_for_type(media_type: str) -> MediaClass | None:
"""Get the appropriate media class for a given media type."""
mapping = {
"artist": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ARTISTS],
"album": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_ALBUMS],
"track": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_TRACKS],
"playlist": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PLAYLISTS],
"radio": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_RADIO],
"podcast": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_PODCASTS],
"audiobook": LIBRARY_MEDIA_CLASS_MAP[LIBRARY_AUDIOBOOKS],
}
return mapping.get(media_type)
async def async_search_media(
mass: MusicAssistantClient,
query: SearchMediaQuery,
) -> SearchMedia:
"""Search media."""
try:
search_query = query.search_query
limit = 5 # Default limit per media type
search_results: SearchResults | None = None
# Handle media_content_id if provided (for contextual searches)
if query.media_content_id:
if "album/" in query.media_content_id:
return await _search_within_album(
mass, query.media_content_id, search_query, limit
)
if "artist/" in query.media_content_id:
# For artists, we already run a search, so save the results
search_results = await _search_within_artist(
mass, query.media_content_id, search_query, limit
)
# Determine which media types to search
media_types = _get_media_types_from_query(query)
# Execute search using the Music Assistant API if we haven't already done so
if search_results is None:
search_results = await mass.music.search(
search_query, media_types=media_types, limit=limit
)
# Process the search results
result = _process_search_results(mass, search_results, media_types)
return SearchMedia(result=result)
except Exception as err:
LOGGER.debug(
"Search error details for %s: %s", query.search_query, err, exc_info=True
)
raise SearchError(f"Error searching for {query.search_query}") from err

View File

@ -36,6 +36,8 @@ from homeassistant.components.media_player import (
MediaPlayerState,
MediaType as HAMediaType,
RepeatMode,
SearchMedia,
SearchMediaQuery,
async_process_play_media_url,
)
from homeassistant.const import ATTR_NAME, STATE_OFF
@ -74,7 +76,7 @@ from .const import (
DOMAIN,
)
from .entity import MusicAssistantEntity
from .media_browser import async_browse_media
from .media_browser import async_browse_media, async_search_media
from .schemas import QUEUE_DETAILS_SCHEMA, queue_item_dict_from_mass_item
if TYPE_CHECKING:
@ -91,6 +93,7 @@ SUPPORTED_FEATURES_BASE = (
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.CLEAR_PLAYLIST
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.SEARCH_MEDIA
| MediaPlayerEntityFeature.MEDIA_ENQUEUE
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
| MediaPlayerEntityFeature.SEEK
@ -596,6 +599,13 @@ class MusicAssistantPlayer(MusicAssistantEntity, MediaPlayerEntity):
media_content_type,
)
async def async_search_media(self, query: SearchMediaQuery) -> SearchMedia:
"""Search media."""
return await async_search_media(
self.mass,
query,
)
def _update_media_image_url(
self, player: Player, queue: PlayerQueue | None
) -> None:

View File

@ -28,7 +28,7 @@
'original_name': None,
'platform': 'music_assistant',
'previous_unique_id': None,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'translation_key': None,
'unique_id': '00:00:00:00:00:02',
'unit_of_measurement': None,
@ -54,7 +54,7 @@
'media_duration': 300,
'media_position': 0,
'media_title': 'Test Track',
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'volume_level': 0.2,
}),
'context': <ANY>,
@ -94,7 +94,7 @@
'original_name': None,
'platform': 'music_assistant',
'previous_unique_id': None,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'translation_key': None,
'unique_id': 'test_group_player_1',
'unit_of_measurement': None,
@ -125,7 +125,7 @@
'media_title': 'November Rain',
'repeat': 'all',
'shuffle': True,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'volume_level': 0.06,
}),
'context': <ANY>,
@ -165,7 +165,7 @@
'original_name': None,
'platform': 'music_assistant',
'previous_unique_id': None,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'translation_key': None,
'unique_id': '00:00:00:00:00:01',
'unit_of_measurement': None,
@ -181,7 +181,7 @@
]),
'icon': 'mdi:speaker',
'mass_player_type': 'player',
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
}),
'context': <ANY>,
'entity_id': 'media_player.test_player_1',

View File

@ -1,10 +1,18 @@
"""Test Music Assistant media browser implementation."""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
import pytest
from homeassistant.components.media_player import BrowseError, BrowseMedia, MediaType
from homeassistant.components.media_player import (
BrowseError,
BrowseMedia,
MediaClass,
MediaType,
SearchError,
SearchMedia,
SearchMediaQuery,
)
from homeassistant.components.music_assistant.const import DOMAIN
from homeassistant.components.music_assistant.media_browser import (
LIBRARY_ALBUMS,
@ -14,7 +22,10 @@ from homeassistant.components.music_assistant.media_browser import (
LIBRARY_PODCASTS,
LIBRARY_RADIO,
LIBRARY_TRACKS,
MEDIA_TYPE_AUDIOBOOK,
MEDIA_TYPE_RADIO,
async_browse_media,
async_search_media,
)
from homeassistant.core import HomeAssistant
@ -67,3 +78,249 @@ async def test_browse_media_not_found(
with pytest.raises(BrowseError, match="Media not found: unknown / unknown"):
await async_browse_media(hass, music_assistant_client, "unknown", "unknown")
class MockSearchResults:
"""Mock search results."""
def __init__(self, media_types: list[str]) -> None:
"""Initialize mock search results."""
self.artists = []
self.albums = []
self.tracks = []
self.playlists = []
self.radio = []
self.podcasts = []
self.audiobooks = []
# Create mock items based on requested media types
for media_type in media_types:
items = []
for i in range(5): # Create 5 mock items for each type
item = MagicMock()
item.name = f"Test {media_type} {i}"
item.uri = f"library://{media_type}/{i}"
item.available = True
item.artists = []
media_type_mock = MagicMock()
media_type_mock.value = media_type
item.media_type = media_type_mock
items.append(item)
# Assign to the appropriate attribute
if media_type == "artist":
self.artists = items
elif media_type == "album":
self.albums = items
elif media_type == "track":
self.tracks = items
elif media_type == "playlist":
self.playlists = items
elif media_type == "radio":
self.radio = items
elif media_type == "podcast":
self.podcasts = items
elif media_type == "audiobook":
self.audiobooks = items
@pytest.mark.parametrize(
("search_query", "media_content_type", "expected_items"),
[
# Search for tracks
("track", MediaType.TRACK, 5),
# Search for albums
("album", MediaType.ALBUM, 5),
# Search for artists
("artist", MediaType.ARTIST, 5),
# Search for playlists
("playlist", MediaType.PLAYLIST, 5),
# Search for radio stations
("radio", MEDIA_TYPE_RADIO, 5),
# Search for podcasts
("podcast", MediaType.PODCAST, 5),
# Search for audiobooks
("audiobook", MEDIA_TYPE_AUDIOBOOK, 5),
# Search with no media type specified (should return all types)
("music", None, 35),
],
)
async def test_search_media(
hass: HomeAssistant,
music_assistant_client: MagicMock,
search_query: str,
media_content_type: str,
expected_items: int,
) -> None:
"""Test the async_search_media method with different content types."""
await setup_integration_from_fixtures(hass, music_assistant_client)
# Create mock search results
media_types = []
if media_content_type == MediaType.TRACK:
media_types = ["track"]
elif media_content_type == MediaType.ALBUM:
media_types = ["album"]
elif media_content_type == MediaType.ARTIST:
media_types = ["artist"]
elif media_content_type == MediaType.PLAYLIST:
media_types = ["playlist"]
elif media_content_type == MEDIA_TYPE_RADIO:
media_types = ["radio"]
elif media_content_type == MediaType.PODCAST:
media_types = ["podcast"]
elif media_content_type == MEDIA_TYPE_AUDIOBOOK:
media_types = ["audiobook"]
elif media_content_type is None:
media_types = [
"artist",
"album",
"track",
"playlist",
"radio",
"podcast",
"audiobook",
]
mock_results = MockSearchResults(media_types)
# Use patch instead of trying to mock return_value
with patch.object(
music_assistant_client.music, "search", return_value=mock_results
):
# Create search query
query = SearchMediaQuery(
search_query=search_query,
media_content_type=media_content_type,
)
# Perform search
search_results = await async_search_media(music_assistant_client, query)
# Verify search results
assert isinstance(search_results, SearchMedia)
if media_content_type is not None:
# For specific media types, expect up to 5 results
assert len(search_results.result) <= 5
else:
# For "all types" search, we'd expect items from each type
# But since we're returning exactly 5 items per type (from mock)
# we'd expect 5 * 7 = 35 items maximum
assert len(search_results.result) <= 35
@pytest.mark.parametrize(
("search_query", "media_filter_classes", "expected_media_types"),
[
# Search for tracks
("track", {MediaClass.TRACK}, ["track"]),
# Search for albums
("album", {MediaClass.ALBUM}, ["album"]),
# Search for artists
("artist", {MediaClass.ARTIST}, ["artist"]),
# Search for playlists
("playlist", {MediaClass.PLAYLIST}, ["playlist"]),
# Search for multiple media classes
("music", {MediaClass.ALBUM, MediaClass.TRACK}, ["album", "track"]),
],
)
async def test_search_media_with_filter_classes(
hass: HomeAssistant,
music_assistant_client: MagicMock,
search_query: str,
media_filter_classes: set[MediaClass],
expected_media_types: list[str],
) -> None:
"""Test the async_search_media method with different media filter classes."""
await setup_integration_from_fixtures(hass, music_assistant_client)
# Create mock search results
mock_results = MockSearchResults(expected_media_types)
# Use patch instead of trying to mock return_value directly
with patch.object(
music_assistant_client.music, "search", return_value=mock_results
):
# Create search query
query = SearchMediaQuery(
search_query=search_query,
media_filter_classes=media_filter_classes,
)
# Perform search
search_results = await async_search_media(music_assistant_client, query)
# Verify search results
assert isinstance(search_results, SearchMedia)
expected_items = len(expected_media_types) * 5 # 5 items per media type
assert len(search_results.result) <= expected_items
async def test_search_media_within_album(
hass: HomeAssistant,
music_assistant_client: MagicMock,
) -> None:
"""Test searching within an album context."""
await setup_integration_from_fixtures(hass, music_assistant_client)
# Mock album and tracks
album = MagicMock()
album.item_id = "396"
album.provider = "library"
tracks = []
for i in range(5):
track = MagicMock()
track.name = f"Test Track {i}"
track.uri = f"library://track/{i}"
track.available = True
track.artists = []
media_type_mock = MagicMock()
media_type_mock.value = "track"
track.media_type = media_type_mock
tracks.append(track)
# Set up mocks using patch
with (
patch.object(
music_assistant_client.music, "get_item_by_uri", return_value=album
),
patch.object(
music_assistant_client.music, "get_album_tracks", return_value=tracks
),
):
# Create search query within an album
album_uri = "library://album/396"
query = SearchMediaQuery(
search_query="track",
media_content_id=album_uri,
)
# Perform search
search_results = await async_search_media(music_assistant_client, query)
# Verify search results
assert isinstance(search_results, SearchMedia)
assert len(search_results.result) > 0 # Should have results
async def test_search_media_error(
hass: HomeAssistant,
music_assistant_client: MagicMock,
) -> None:
"""Test that search errors are properly handled."""
await setup_integration_from_fixtures(hass, music_assistant_client)
# Use patch to cause an exception
with patch.object(
music_assistant_client.music, "search", side_effect=Exception("Search failed")
):
# Create search query
query = SearchMediaQuery(
search_query="error test",
)
# Verify that the error is caught and a SearchError is raised
with pytest.raises(SearchError, match="Error searching for error test"):
await async_search_media(music_assistant_client, query)

View File

@ -651,6 +651,7 @@ async def test_media_player_supported_features(
| MediaPlayerEntityFeature.VOLUME_MUTE
| MediaPlayerEntityFeature.TURN_ON
| MediaPlayerEntityFeature.TURN_OFF
| MediaPlayerEntityFeature.SEARCH_MEDIA
)
assert state.attributes["supported_features"] == expected_features
# remove power control capability from player, trigger subscription callback