Add support for SEARCH_MEDIA feature (#143261)

* initial

* initial

* add tests

* Update for list return

* translate exception

* tests for errors

* review tweaks

* test fix

* force content_type to lowercase

* Allow media_content_type = None

* new test
This commit is contained in:
peteS-UK 2025-05-26 14:01:17 +01:00 committed by GitHub
parent 1c1f5a779b
commit c7745e0d02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 237 additions and 5 deletions

View File

@ -50,21 +50,33 @@ MEDIA_TYPE_TO_SQUEEZEBOX: dict[str | MediaType, str] = {
MediaType.GENRE: "genre",
MediaType.APPS: "apps",
"radios": "radios",
"favorite": "favorite",
}
SQUEEZEBOX_ID_BY_TYPE: dict[str | MediaType, str] = {
MediaType.ALBUM: "album_id",
"albums": "album_id",
MediaType.ARTIST: "artist_id",
"artists": "artist_id",
MediaType.TRACK: "track_id",
"tracks": "track_id",
MediaType.PLAYLIST: "playlist_id",
"playlists": "playlist_id",
MediaType.GENRE: "genre_id",
"genres": "genre_id",
"favorite": "item_id",
"favorites": "item_id",
MediaType.APPS: "item_id",
"app": "item_id",
"radios": "item_id",
"radio": "item_id",
}
CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | str]] = {
"favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
"favorite": {"item": "favorite", "children": ""},
"radios": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
"radio": {"item": MediaClass.DIRECTORY, "children": MediaClass.APP},
"artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
"albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
"tracks": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
@ -100,6 +112,7 @@ CONTENT_TYPE_TO_CHILD_TYPE: dict[
"album artists": MediaType.ARTIST,
MediaType.APPS: MediaType.APP,
MediaType.APP: MediaType.TRACK,
"favorite": None,
}
@ -191,7 +204,7 @@ def _build_response_favorites(item: dict[str, Any]) -> BrowseMedia:
return BrowseMedia(
media_content_id=item["id"],
title=item["title"],
media_content_type="favorites",
media_content_type="favorite",
media_class=CONTENT_TYPE_MEDIA_CLASS[MediaType.TRACK]["item"],
can_expand=bool(item.get("hasitems")),
can_play=bool(item["isaudio"] and item.get("url")),
@ -236,6 +249,7 @@ async def build_item_response(
search_id = payload["search_id"]
search_type = payload["search_type"]
search_query = payload.get("search_query")
assert (
search_type is not None
) # async_browse_media will not call this function if search_type is None
@ -252,6 +266,7 @@ async def build_item_response(
browse_data.media_type_to_squeezebox[search_type],
limit=browse_limit,
browse_id=browse_id,
search_query=search_query,
)
if result is not None and result.get("items"):
@ -261,7 +276,7 @@ async def build_item_response(
for item in result["items"]:
# Force the item id to a string in case it's numeric from some lms
item["id"] = str(item.get("id", ""))
if search_type == "favorites":
if search_type in ["favorites", "favorite"]:
child_media = _build_response_favorites(item)
elif search_type in ["apps", "radios"]:

View File

@ -23,6 +23,8 @@ from homeassistant.components.media_player import (
MediaPlayerState,
MediaType,
RepeatMode,
SearchMedia,
SearchMediaQuery,
async_process_play_media_url,
)
from homeassistant.config_entries import SOURCE_INTEGRATION_DISCOVERY
@ -204,6 +206,7 @@ class SqueezeBoxMediaPlayerEntity(SqueezeboxEntity, MediaPlayerEntity):
| MediaPlayerEntityFeature.GROUPING
| MediaPlayerEntityFeature.MEDIA_ENQUEUE
| MediaPlayerEntityFeature.MEDIA_ANNOUNCE
| MediaPlayerEntityFeature.SEARCH_MEDIA
)
_attr_has_entity_name = True
_attr_name = None
@ -545,6 +548,74 @@ class SqueezeBoxMediaPlayerEntity(SqueezeboxEntity, MediaPlayerEntity):
await self._player.async_index(index)
await self.coordinator.async_refresh()
async def async_search_media(
self,
query: SearchMediaQuery,
) -> SearchMedia:
"""Search the media player."""
_valid_type_list = [
key
for key in self._browse_data.content_type_media_class
if key not in ["apps", "app", "radios", "radio"]
]
_media_content_type_list = (
query.media_content_type.lower().replace(", ", ",").split(",")
if query.media_content_type
else ["albums", "tracks", "artists", "genres"]
)
if query.media_content_type and set(_media_content_type_list).difference(
_valid_type_list
):
_LOGGER.debug("Invalid Media Content Type: %s", query.media_content_type)
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_search_media_content_type",
translation_placeholders={
"media_content_type": ", ".join(_valid_type_list)
},
)
search_response_list: list[BrowseMedia] = []
for _content_type in _media_content_type_list:
payload = {
"search_type": _content_type,
"search_id": query.media_content_id,
"search_query": query.search_query,
}
try:
search_response_list.append(
await build_item_response(
self,
self._player,
payload,
self.browse_limit,
self._browse_data,
)
)
except BrowseError:
_LOGGER.debug("Search Failure: Payload %s", payload)
result: list[BrowseMedia] = []
for search_response in search_response_list:
# Apply the media_filter_classes to the result if specified
if query.media_filter_classes and search_response.children:
search_response.children = [
child
for child in search_response.children
if child.media_content_type in query.media_filter_classes
]
if search_response.children:
result.extend(list(search_response.children))
return SearchMedia(result=result)
async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set the repeat mode."""
if repeat == RepeatMode.ALL:

View File

@ -196,6 +196,9 @@
},
"update_restart_failed": {
"message": "Error trying to update LMS Plugins: Restart failed."
},
"invalid_search_media_content_type": {
"message": "If specified, Media content type must be one of {media_content_type}"
}
}
}

View File

@ -131,11 +131,15 @@ async def mock_async_play_announcement(media_id: str) -> bool:
async def mock_async_browse(
media_type: MediaType, limit: int, browse_id: tuple | None = None
media_type: MediaType,
limit: int,
browse_id: tuple | None = None,
search_query: str | None = None,
) -> dict | None:
"""Mock the async_browse method of pysqueezebox.Player."""
child_types = {
"favorites": "favorites",
"favorite": "favorite",
"new music": "album",
"album artists": "artists",
"albums": "album",
@ -224,6 +228,21 @@ async def mock_async_browse(
"items": fake_items,
}
return None
if search_query:
if search_query not in [x["title"] for x in fake_items]:
return None
for item in fake_items:
if (
item["title"] == search_query
and item["item_type"] == child_types[media_type]
):
return {
"title": media_type,
"items": [item],
}
if (
media_type in MEDIA_TYPE_TO_SQUEEZEBOX.values()
or media_type == "app-fakecommand"

View File

@ -65,7 +65,7 @@
'original_name': None,
'platform': 'squeezebox',
'previous_unique_id': None,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'translation_key': None,
'unique_id': 'aa:bb:cc:dd:ee:ff',
'unit_of_measurement': None,
@ -84,7 +84,7 @@
}),
'repeat': <RepeatMode.OFF: 'off'>,
'shuffle': False,
'supported_features': <MediaPlayerEntityFeature: 4126655>,
'supported_features': <MediaPlayerEntityFeature: 8320959>,
'volume_level': 0.01,
}),
'context': <ANY>,

View File

@ -10,6 +10,7 @@ from homeassistant.components.media_player import (
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
BrowseError,
MediaClass,
MediaType,
)
from homeassistant.components.squeezebox.browse_media import (
@ -170,6 +171,129 @@ async def test_async_browse_media_for_apps(
assert "Fake Invalid Item 1" not in search
@pytest.mark.parametrize(
("category", "media_filter_classes"),
[
("favorites", None),
("artists", None),
("albums", None),
("playlists", None),
("genres", None),
("new music", None),
("album artists", None),
("albums", [MediaClass.ALBUM]),
],
)
async def test_async_search_media(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
category: str,
media_filter_classes: list[MediaClass] | None,
) -> None:
"""Test each category with subitems."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/search_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": category,
"search_query": "Fake Item 1",
"media_filter_classes": media_filter_classes,
}
)
response = await client.receive_json()
assert response["success"]
category_level = response["result"]["result"]
assert category_level[0]["title"] == "Fake Item 1"
async def test_async_search_media_invalid_filter(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test search_media action with invalid media_filter_class."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/search_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "albums",
"search_query": "Fake Item 1",
"media_filter_classes": "movie",
}
)
response = await client.receive_json()
assert response["success"]
assert len(response["result"]["result"]) == 0
async def test_async_search_media_invalid_type(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test search_media action with invalid media_content_type."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/search_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "Fake Type",
"search_query": "Fake Item 1",
},
)
response = await client.receive_json()
assert not response["success"]
err_message = "If specified, Media content type must be one of"
assert err_message in response["error"]["message"]
async def test_async_search_media_not_found(
hass: HomeAssistant,
config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test trying to play an item that doesn't exist."""
with patch(
"homeassistant.components.squeezebox.browse_media.is_internal_request",
return_value=False,
):
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/search_media",
"entity_id": "media_player.test_player",
"media_content_id": "",
"media_content_type": "",
"search_query": "Unknown Item",
},
)
response = await client.receive_json()
assert len(response["result"]["result"]) == 0
async def test_generate_playlist_for_app(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,