From 8a09303c98e0abb847b6d12c61a647ec4ef739e9 Mon Sep 17 00:00:00 2001 From: Franck Nijhof Date: Wed, 9 Feb 2022 22:03:15 +0100 Subject: [PATCH] Extract Spotify media browsing into a module (#66175) --- .coveragerc | 2 + homeassistant/components/spotify/__init__.py | 40 +- .../components/spotify/browse_media.py | 439 ++++++++++++++++++ homeassistant/components/spotify/const.py | 17 + .../components/spotify/media_player.py | 411 +--------------- homeassistant/components/spotify/util.py | 24 + 6 files changed, 493 insertions(+), 440 deletions(-) create mode 100644 homeassistant/components/spotify/browse_media.py create mode 100644 homeassistant/components/spotify/util.py diff --git a/.coveragerc b/.coveragerc index 61dccdb2d8b..b64c08cc1af 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1078,8 +1078,10 @@ omit = homeassistant/components/spider/* homeassistant/components/splunk/* homeassistant/components/spotify/__init__.py + homeassistant/components/spotify/browse_media.py homeassistant/components/spotify/media_player.py homeassistant/components/spotify/system_health.py + homeassistant/components/spotify/util.py homeassistant/components/squeezebox/__init__.py homeassistant/components/squeezebox/browse_media.py homeassistant/components/squeezebox/media_player.py diff --git a/homeassistant/components/spotify/__init__.py b/homeassistant/components/spotify/__init__.py index a2a1fee50f2..5599965a2a6 100644 --- a/homeassistant/components/spotify/__init__.py +++ b/homeassistant/components/spotify/__init__.py @@ -4,7 +4,6 @@ import aiohttp from spotipy import Spotify, SpotifyException import voluptuous as vol -from homeassistant.components.media_player import BrowseError, BrowseMedia from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_CREDENTIALS, @@ -22,15 +21,15 @@ from homeassistant.helpers.config_entry_oauth2_flow import ( from homeassistant.helpers.typing import ConfigType from . import config_flow +from .browse_media import async_browse_media from .const import ( DATA_SPOTIFY_CLIENT, DATA_SPOTIFY_ME, DATA_SPOTIFY_SESSION, DOMAIN, - MEDIA_PLAYER_PREFIX, SPOTIFY_SCOPES, ) -from .media_player import async_browse_media_internal +from .util import is_spotify_media_type, resolve_spotify_media_type CONFIG_SCHEMA = vol.Schema( { @@ -47,35 +46,12 @@ CONFIG_SCHEMA = vol.Schema( PLATFORMS = [Platform.MEDIA_PLAYER] -def is_spotify_media_type(media_content_type: str) -> bool: - """Return whether the media_content_type is a valid Spotify media_id.""" - return media_content_type.startswith(MEDIA_PLAYER_PREFIX) - - -def resolve_spotify_media_type(media_content_type: str) -> str: - """Return actual spotify media_content_type.""" - return media_content_type[len(MEDIA_PLAYER_PREFIX) :] - - -async def async_browse_media( - hass: HomeAssistant, - media_content_type: str, - media_content_id: str, - *, - can_play_artist: bool = True, -) -> BrowseMedia: - """Browse Spotify media.""" - if not (info := next(iter(hass.data[DOMAIN].values()), None)): - raise BrowseError("No Spotify accounts available") - return await async_browse_media_internal( - hass, - info[DATA_SPOTIFY_CLIENT], - info[DATA_SPOTIFY_SESSION], - info[DATA_SPOTIFY_ME], - media_content_type, - media_content_id, - can_play_artist=can_play_artist, - ) +__all__ = [ + "async_browse_media", + "DOMAIN", + "is_spotify_media_type", + "resolve_spotify_media_type", +] async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: diff --git a/homeassistant/components/spotify/browse_media.py b/homeassistant/components/spotify/browse_media.py new file mode 100644 index 00000000000..0ffdfd6ace6 --- /dev/null +++ b/homeassistant/components/spotify/browse_media.py @@ -0,0 +1,439 @@ +"""Support for Spotify media browsing.""" +from __future__ import annotations + +from functools import partial +import logging +from typing import Any + +from spotipy import Spotify + +from homeassistant.backports.enum import StrEnum +from homeassistant.components.media_player import BrowseError, BrowseMedia +from homeassistant.components.media_player.const import ( + MEDIA_CLASS_ALBUM, + MEDIA_CLASS_ARTIST, + MEDIA_CLASS_DIRECTORY, + MEDIA_CLASS_EPISODE, + MEDIA_CLASS_GENRE, + MEDIA_CLASS_PLAYLIST, + MEDIA_CLASS_PODCAST, + MEDIA_CLASS_TRACK, + MEDIA_TYPE_ALBUM, + MEDIA_TYPE_ARTIST, + MEDIA_TYPE_EPISODE, + MEDIA_TYPE_PLAYLIST, + MEDIA_TYPE_TRACK, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.config_entry_oauth2_flow import OAuth2Session + +from .const import ( + DATA_SPOTIFY_CLIENT, + DATA_SPOTIFY_ME, + DATA_SPOTIFY_SESSION, + DOMAIN, + MEDIA_PLAYER_PREFIX, + MEDIA_TYPE_SHOW, + PLAYABLE_MEDIA_TYPES, +) +from .util import fetch_image_url + +BROWSE_LIMIT = 48 + + +_LOGGER = logging.getLogger(__name__) + + +class BrowsableMedia(StrEnum): + """Enum of browsable media.""" + + CURRENT_USER_PLAYLISTS = "current_user_playlists" + CURRENT_USER_FOLLOWED_ARTISTS = "current_user_followed_artists" + CURRENT_USER_SAVED_ALBUMS = "current_user_saved_albums" + CURRENT_USER_SAVED_TRACKS = "current_user_saved_tracks" + CURRENT_USER_SAVED_SHOWS = "current_user_saved_shows" + CURRENT_USER_RECENTLY_PLAYED = "current_user_recently_played" + CURRENT_USER_TOP_ARTISTS = "current_user_top_artists" + CURRENT_USER_TOP_TRACKS = "current_user_top_tracks" + CATEGORIES = "categories" + FEATURED_PLAYLISTS = "featured_playlists" + NEW_RELEASES = "new_releases" + + +LIBRARY_MAP = { + BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists", + BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists", + BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums", + BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks", + BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts", + BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played", + BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists", + BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks", + BrowsableMedia.CATEGORIES.value: "Categories", + BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists", + BrowsableMedia.NEW_RELEASES.value: "New Releases", +} + +CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = { + BrowsableMedia.CURRENT_USER_PLAYLISTS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_PLAYLIST, + }, + BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_ARTIST, + }, + BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_ALBUM, + }, + BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_TRACK, + }, + BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_PODCAST, + }, + BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_TRACK, + }, + BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_ARTIST, + }, + BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_TRACK, + }, + BrowsableMedia.FEATURED_PLAYLISTS.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_PLAYLIST, + }, + BrowsableMedia.CATEGORIES.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_GENRE, + }, + "category_playlists": { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_PLAYLIST, + }, + BrowsableMedia.NEW_RELEASES.value: { + "parent": MEDIA_CLASS_DIRECTORY, + "children": MEDIA_CLASS_ALBUM, + }, + MEDIA_TYPE_PLAYLIST: { + "parent": MEDIA_CLASS_PLAYLIST, + "children": MEDIA_CLASS_TRACK, + }, + MEDIA_TYPE_ALBUM: {"parent": MEDIA_CLASS_ALBUM, "children": MEDIA_CLASS_TRACK}, + MEDIA_TYPE_ARTIST: {"parent": MEDIA_CLASS_ARTIST, "children": MEDIA_CLASS_ALBUM}, + MEDIA_TYPE_EPISODE: {"parent": MEDIA_CLASS_EPISODE, "children": None}, + MEDIA_TYPE_SHOW: {"parent": MEDIA_CLASS_PODCAST, "children": MEDIA_CLASS_EPISODE}, + MEDIA_TYPE_TRACK: {"parent": MEDIA_CLASS_TRACK, "children": None}, +} + + +class MissingMediaInformation(BrowseError): + """Missing media required information.""" + + +class UnknownMediaType(BrowseError): + """Unknown media type.""" + + +async def async_browse_media( + hass: HomeAssistant, + media_content_type: str, + media_content_id: str, + *, + can_play_artist: bool = True, +) -> BrowseMedia: + """Browse Spotify media.""" + if not (info := next(iter(hass.data[DOMAIN].values()), None)): + raise BrowseError("No Spotify accounts available") + return await async_browse_media_internal( + hass, + info[DATA_SPOTIFY_CLIENT], + info[DATA_SPOTIFY_SESSION], + info[DATA_SPOTIFY_ME], + media_content_type, + media_content_id, + can_play_artist=can_play_artist, + ) + + +async def async_browse_media_internal( + hass: HomeAssistant, + spotify: Spotify, + session: OAuth2Session, + current_user: dict[str, Any], + media_content_type: str | None, + media_content_id: str | None, + *, + can_play_artist: bool = True, +) -> BrowseMedia: + """Browse spotify media.""" + if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"): + return await hass.async_add_executor_job( + partial(library_payload, can_play_artist=can_play_artist) + ) + + await session.async_ensure_token_valid() + + # Strip prefix + if media_content_type: + media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :] + + payload = { + "media_content_type": media_content_type, + "media_content_id": media_content_id, + } + response = await hass.async_add_executor_job( + partial( + build_item_response, + spotify, + current_user, + payload, + can_play_artist=can_play_artist, + ) + ) + if response is None: + raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}") + return response + + +def build_item_response( # noqa: C901 + spotify: Spotify, + user: dict[str, Any], + payload: dict[str, str | None], + *, + can_play_artist: bool, +) -> BrowseMedia | None: + """Create response payload for the provided media query.""" + media_content_type = payload["media_content_type"] + media_content_id = payload["media_content_id"] + + if media_content_type is None or media_content_id is None: + return None + + title = None + image = None + media: dict[str, Any] | None = None + items = [] + + if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS: + if media := spotify.current_user_playlists(limit=BROWSE_LIMIT): + items = media.get("items", []) + elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: + if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT): + items = media.get("artists", {}).get("items", []) + elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: + if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT): + items = [item["album"] for item in media.get("items", [])] + elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS: + if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT): + items = [item["track"] for item in media.get("items", [])] + elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS: + if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT): + items = [item["show"] for item in media.get("items", [])] + elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: + if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT): + items = [item["track"] for item in media.get("items", [])] + elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS: + if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT): + items = media.get("items", []) + elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS: + if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT): + items = media.get("items", []) + elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS: + if media := spotify.featured_playlists( + country=user["country"], limit=BROWSE_LIMIT + ): + items = media.get("playlists", {}).get("items", []) + elif media_content_type == BrowsableMedia.CATEGORIES: + if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT): + items = media.get("categories", {}).get("items", []) + elif media_content_type == "category_playlists": + if ( + media := spotify.category_playlists( + category_id=media_content_id, + country=user["country"], + limit=BROWSE_LIMIT, + ) + ) and (category := spotify.category(media_content_id, country=user["country"])): + title = category.get("name") + image = fetch_image_url(category, key="icons") + items = media.get("playlists", {}).get("items", []) + elif media_content_type == BrowsableMedia.NEW_RELEASES: + if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT): + items = media.get("albums", {}).get("items", []) + elif media_content_type == MEDIA_TYPE_PLAYLIST: + if media := spotify.playlist(media_content_id): + items = [item["track"] for item in media.get("tracks", {}).get("items", [])] + elif media_content_type == MEDIA_TYPE_ALBUM: + if media := spotify.album(media_content_id): + items = media.get("tracks", {}).get("items", []) + elif media_content_type == MEDIA_TYPE_ARTIST: + if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and ( + artist := spotify.artist(media_content_id) + ): + title = artist.get("name") + image = fetch_image_url(artist) + items = media.get("items", []) + elif media_content_type == MEDIA_TYPE_SHOW: + if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and ( + show := spotify.show(media_content_id) + ): + title = show.get("name") + image = fetch_image_url(show) + items = media.get("items", []) + + if media is None: + return None + + try: + media_class = CONTENT_TYPE_MEDIA_CLASS[media_content_type] + except KeyError: + _LOGGER.debug("Unknown media type received: %s", media_content_type) + return None + + if media_content_type == BrowsableMedia.CATEGORIES: + media_item = BrowseMedia( + can_expand=True, + can_play=False, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_content_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", + title=LIBRARY_MAP.get(media_content_id, "Unknown"), + ) + + media_item.children = [] + for item in items: + try: + item_id = item["id"] + except KeyError: + _LOGGER.debug("Missing ID for media item: %s", item) + continue + media_item.children.append( + BrowseMedia( + can_expand=True, + can_play=False, + children_media_class=MEDIA_CLASS_TRACK, + media_class=MEDIA_CLASS_PLAYLIST, + media_content_id=item_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists", + thumbnail=fetch_image_url(item, key="icons"), + title=item.get("name"), + ) + ) + return media_item + + if title is None: + title = LIBRARY_MAP.get(media_content_id, "Unknown") + if "name" in media: + title = media["name"] + + can_play = media_content_type in PLAYABLE_MEDIA_TYPES and ( + media_content_type != MEDIA_TYPE_ARTIST or can_play_artist + ) + + browse_media = BrowseMedia( + can_expand=True, + can_play=can_play, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_content_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", + thumbnail=image, + title=title, + ) + + browse_media.children = [] + for item in items: + try: + browse_media.children.append( + item_payload(item, can_play_artist=can_play_artist) + ) + except (MissingMediaInformation, UnknownMediaType): + continue + + if "images" in media: + browse_media.thumbnail = fetch_image_url(media) + + return browse_media + + +def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia: + """ + Create response payload for a single media item. + + Used by async_browse_media. + """ + try: + media_type = item["type"] + media_id = item["uri"] + except KeyError as err: + _LOGGER.debug("Missing type or URI for media item: %s", item) + raise MissingMediaInformation from err + + try: + media_class = CONTENT_TYPE_MEDIA_CLASS[media_type] + except KeyError as err: + _LOGGER.debug("Unknown media type received: %s", media_type) + raise UnknownMediaType from err + + can_expand = media_type not in [ + MEDIA_TYPE_TRACK, + MEDIA_TYPE_EPISODE, + ] + + can_play = media_type in PLAYABLE_MEDIA_TYPES and ( + media_type != MEDIA_TYPE_ARTIST or can_play_artist + ) + + browse_media = BrowseMedia( + can_expand=can_expand, + can_play=can_play, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}", + title=item.get("name", "Unknown"), + ) + + if "images" in item: + browse_media.thumbnail = fetch_image_url(item) + elif MEDIA_TYPE_ALBUM in item: + browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM]) + + return browse_media + + +def library_payload(*, can_play_artist: bool) -> BrowseMedia: + """ + Create response payload to describe contents of a specific library. + + Used by async_browse_media. + """ + browse_media = BrowseMedia( + can_expand=True, + can_play=False, + children_media_class=MEDIA_CLASS_DIRECTORY, + media_class=MEDIA_CLASS_DIRECTORY, + media_content_id="library", + media_content_type=f"{MEDIA_PLAYER_PREFIX}library", + title="Media Library", + ) + + browse_media.children = [] + for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]: + browse_media.children.append( + item_payload( + {"name": item["name"], "type": item["type"], "uri": item["type"]}, + can_play_artist=can_play_artist, + ) + ) + return browse_media diff --git a/homeassistant/components/spotify/const.py b/homeassistant/components/spotify/const.py index 7978ac8712f..6e54ed21ec1 100644 --- a/homeassistant/components/spotify/const.py +++ b/homeassistant/components/spotify/const.py @@ -1,4 +1,11 @@ """Define constants for the Spotify integration.""" +from homeassistant.components.media_player.const import ( + MEDIA_TYPE_ALBUM, + MEDIA_TYPE_ARTIST, + MEDIA_TYPE_EPISODE, + MEDIA_TYPE_PLAYLIST, + MEDIA_TYPE_TRACK, +) DOMAIN = "spotify" @@ -24,3 +31,13 @@ SPOTIFY_SCOPES = [ ] MEDIA_PLAYER_PREFIX = "spotify://" +MEDIA_TYPE_SHOW = "show" + +PLAYABLE_MEDIA_TYPES = [ + MEDIA_TYPE_PLAYLIST, + MEDIA_TYPE_ALBUM, + MEDIA_TYPE_ARTIST, + MEDIA_TYPE_EPISODE, + MEDIA_TYPE_SHOW, + MEDIA_TYPE_TRACK, +] diff --git a/homeassistant/components/spotify/media_player.py b/homeassistant/components/spotify/media_player.py index bdb0ea8b959..58f581fdf82 100644 --- a/homeassistant/components/spotify/media_player.py +++ b/homeassistant/components/spotify/media_player.py @@ -4,7 +4,6 @@ from __future__ import annotations from asyncio import run_coroutine_threadsafe import datetime as dt from datetime import timedelta -from functools import partial import logging from typing import Any @@ -12,19 +11,8 @@ import requests from spotipy import Spotify, SpotifyException from yarl import URL -from homeassistant.backports.enum import StrEnum from homeassistant.components.media_player import BrowseMedia, MediaPlayerEntity from homeassistant.components.media_player.const import ( - MEDIA_CLASS_ALBUM, - MEDIA_CLASS_ARTIST, - MEDIA_CLASS_DIRECTORY, - MEDIA_CLASS_EPISODE, - MEDIA_CLASS_GENRE, - MEDIA_CLASS_PLAYLIST, - MEDIA_CLASS_PODCAST, - MEDIA_CLASS_TRACK, - MEDIA_TYPE_ALBUM, - MEDIA_TYPE_ARTIST, MEDIA_TYPE_EPISODE, MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST, @@ -44,7 +32,6 @@ from homeassistant.components.media_player.const import ( SUPPORT_SHUFFLE_SET, SUPPORT_VOLUME_SET, ) -from homeassistant.components.media_player.errors import BrowseError from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_ID, @@ -61,14 +48,17 @@ from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.util.dt import utc_from_timestamp +from .browse_media import async_browse_media_internal from .const import ( DATA_SPOTIFY_CLIENT, DATA_SPOTIFY_ME, DATA_SPOTIFY_SESSION, DOMAIN, MEDIA_PLAYER_PREFIX, + PLAYABLE_MEDIA_TYPES, SPOTIFY_SCOPES, ) +from .util import fetch_image_url _LOGGER = logging.getLogger(__name__) @@ -98,118 +88,6 @@ REPEAT_MODE_MAPPING_TO_SPOTIFY = { value: key for key, value in REPEAT_MODE_MAPPING_TO_HA.items() } -BROWSE_LIMIT = 48 - -MEDIA_TYPE_SHOW = "show" - -PLAYABLE_MEDIA_TYPES = [ - MEDIA_TYPE_PLAYLIST, - MEDIA_TYPE_ALBUM, - MEDIA_TYPE_ARTIST, - MEDIA_TYPE_EPISODE, - MEDIA_TYPE_SHOW, - MEDIA_TYPE_TRACK, -] - - -class BrowsableMedia(StrEnum): - """Enum of browsable media.""" - - CURRENT_USER_PLAYLISTS = "current_user_playlists" - CURRENT_USER_FOLLOWED_ARTISTS = "current_user_followed_artists" - CURRENT_USER_SAVED_ALBUMS = "current_user_saved_albums" - CURRENT_USER_SAVED_TRACKS = "current_user_saved_tracks" - CURRENT_USER_SAVED_SHOWS = "current_user_saved_shows" - CURRENT_USER_RECENTLY_PLAYED = "current_user_recently_played" - CURRENT_USER_TOP_ARTISTS = "current_user_top_artists" - CURRENT_USER_TOP_TRACKS = "current_user_top_tracks" - CATEGORIES = "categories" - FEATURED_PLAYLISTS = "featured_playlists" - NEW_RELEASES = "new_releases" - - -LIBRARY_MAP = { - BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists", - BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists", - BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums", - BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks", - BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts", - BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played", - BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists", - BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks", - BrowsableMedia.CATEGORIES.value: "Categories", - BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists", - BrowsableMedia.NEW_RELEASES.value: "New Releases", -} - -CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = { - BrowsableMedia.CURRENT_USER_PLAYLISTS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_PLAYLIST, - }, - BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_ARTIST, - }, - BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_ALBUM, - }, - BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_TRACK, - }, - BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_PODCAST, - }, - BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_TRACK, - }, - BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_ARTIST, - }, - BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_TRACK, - }, - BrowsableMedia.FEATURED_PLAYLISTS.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_PLAYLIST, - }, - BrowsableMedia.CATEGORIES.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_GENRE, - }, - "category_playlists": { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_PLAYLIST, - }, - BrowsableMedia.NEW_RELEASES.value: { - "parent": MEDIA_CLASS_DIRECTORY, - "children": MEDIA_CLASS_ALBUM, - }, - MEDIA_TYPE_PLAYLIST: { - "parent": MEDIA_CLASS_PLAYLIST, - "children": MEDIA_CLASS_TRACK, - }, - MEDIA_TYPE_ALBUM: {"parent": MEDIA_CLASS_ALBUM, "children": MEDIA_CLASS_TRACK}, - MEDIA_TYPE_ARTIST: {"parent": MEDIA_CLASS_ARTIST, "children": MEDIA_CLASS_ALBUM}, - MEDIA_TYPE_EPISODE: {"parent": MEDIA_CLASS_EPISODE, "children": None}, - MEDIA_TYPE_SHOW: {"parent": MEDIA_CLASS_PODCAST, "children": MEDIA_CLASS_EPISODE}, - MEDIA_TYPE_TRACK: {"parent": MEDIA_CLASS_TRACK, "children": None}, -} - - -class MissingMediaInformation(BrowseError): - """Missing media required information.""" - - -class UnknownMediaType(BrowseError): - """Unknown media type.""" - async def async_setup_entry( hass: HomeAssistant, @@ -572,286 +450,3 @@ class SpotifyMediaPlayer(MediaPlayerEntity): media_content_type, media_content_id, ) - - -async def async_browse_media_internal( - hass: HomeAssistant, - spotify: Spotify, - session: OAuth2Session, - current_user: dict[str, Any], - media_content_type: str | None, - media_content_id: str | None, - *, - can_play_artist: bool = True, -) -> BrowseMedia: - """Browse spotify media.""" - if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"): - return await hass.async_add_executor_job( - partial(library_payload, can_play_artist=can_play_artist) - ) - - await session.async_ensure_token_valid() - - # Strip prefix - if media_content_type: - media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :] - - payload = { - "media_content_type": media_content_type, - "media_content_id": media_content_id, - } - response = await hass.async_add_executor_job( - partial( - build_item_response, - spotify, - current_user, - payload, - can_play_artist=can_play_artist, - ) - ) - if response is None: - raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}") - return response - - -def build_item_response( # noqa: C901 - spotify: Spotify, - user: dict[str, Any], - payload: dict[str, str | None], - *, - can_play_artist: bool, -) -> BrowseMedia | None: - """Create response payload for the provided media query.""" - media_content_type = payload["media_content_type"] - media_content_id = payload["media_content_id"] - - if media_content_type is None or media_content_id is None: - return None - - title = None - image = None - media: dict[str, Any] | None = None - items = [] - - if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS: - if media := spotify.current_user_playlists(limit=BROWSE_LIMIT): - items = media.get("items", []) - elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: - if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT): - items = media.get("artists", {}).get("items", []) - elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: - if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT): - items = [item["album"] for item in media.get("items", [])] - elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS: - if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT): - items = [item["track"] for item in media.get("items", [])] - elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS: - if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT): - items = [item["show"] for item in media.get("items", [])] - elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: - if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT): - items = [item["track"] for item in media.get("items", [])] - elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS: - if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT): - items = media.get("items", []) - elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS: - if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT): - items = media.get("items", []) - elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS: - if media := spotify.featured_playlists( - country=user["country"], limit=BROWSE_LIMIT - ): - items = media.get("playlists", {}).get("items", []) - elif media_content_type == BrowsableMedia.CATEGORIES: - if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT): - items = media.get("categories", {}).get("items", []) - elif media_content_type == "category_playlists": - if ( - media := spotify.category_playlists( - category_id=media_content_id, - country=user["country"], - limit=BROWSE_LIMIT, - ) - ) and (category := spotify.category(media_content_id, country=user["country"])): - title = category.get("name") - image = fetch_image_url(category, key="icons") - items = media.get("playlists", {}).get("items", []) - elif media_content_type == BrowsableMedia.NEW_RELEASES: - if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT): - items = media.get("albums", {}).get("items", []) - elif media_content_type == MEDIA_TYPE_PLAYLIST: - if media := spotify.playlist(media_content_id): - items = [item["track"] for item in media.get("tracks", {}).get("items", [])] - elif media_content_type == MEDIA_TYPE_ALBUM: - if media := spotify.album(media_content_id): - items = media.get("tracks", {}).get("items", []) - elif media_content_type == MEDIA_TYPE_ARTIST: - if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and ( - artist := spotify.artist(media_content_id) - ): - title = artist.get("name") - image = fetch_image_url(artist) - items = media.get("items", []) - elif media_content_type == MEDIA_TYPE_SHOW: - if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and ( - show := spotify.show(media_content_id) - ): - title = show.get("name") - image = fetch_image_url(show) - items = media.get("items", []) - - if media is None: - return None - - try: - media_class = CONTENT_TYPE_MEDIA_CLASS[media_content_type] - except KeyError: - _LOGGER.debug("Unknown media type received: %s", media_content_type) - return None - - if media_content_type == BrowsableMedia.CATEGORIES: - media_item = BrowseMedia( - can_expand=True, - can_play=False, - children_media_class=media_class["children"], - media_class=media_class["parent"], - media_content_id=media_content_id, - media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", - title=LIBRARY_MAP.get(media_content_id, "Unknown"), - ) - - media_item.children = [] - for item in items: - try: - item_id = item["id"] - except KeyError: - _LOGGER.debug("Missing ID for media item: %s", item) - continue - media_item.children.append( - BrowseMedia( - can_expand=True, - can_play=False, - children_media_class=MEDIA_CLASS_TRACK, - media_class=MEDIA_CLASS_PLAYLIST, - media_content_id=item_id, - media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists", - thumbnail=fetch_image_url(item, key="icons"), - title=item.get("name"), - ) - ) - return media_item - - if title is None: - title = LIBRARY_MAP.get(media_content_id, "Unknown") - if "name" in media: - title = media["name"] - - can_play = media_content_type in PLAYABLE_MEDIA_TYPES and ( - media_content_type != MEDIA_TYPE_ARTIST or can_play_artist - ) - - browse_media = BrowseMedia( - can_expand=True, - can_play=can_play, - children_media_class=media_class["children"], - media_class=media_class["parent"], - media_content_id=media_content_id, - media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", - thumbnail=image, - title=title, - ) - - browse_media.children = [] - for item in items: - try: - browse_media.children.append( - item_payload(item, can_play_artist=can_play_artist) - ) - except (MissingMediaInformation, UnknownMediaType): - continue - - if "images" in media: - browse_media.thumbnail = fetch_image_url(media) - - return browse_media - - -def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia: - """ - Create response payload for a single media item. - - Used by async_browse_media. - """ - try: - media_type = item["type"] - media_id = item["uri"] - except KeyError as err: - _LOGGER.debug("Missing type or URI for media item: %s", item) - raise MissingMediaInformation from err - - try: - media_class = CONTENT_TYPE_MEDIA_CLASS[media_type] - except KeyError as err: - _LOGGER.debug("Unknown media type received: %s", media_type) - raise UnknownMediaType from err - - can_expand = media_type not in [ - MEDIA_TYPE_TRACK, - MEDIA_TYPE_EPISODE, - ] - - can_play = media_type in PLAYABLE_MEDIA_TYPES and ( - media_type != MEDIA_TYPE_ARTIST or can_play_artist - ) - - browse_media = BrowseMedia( - can_expand=can_expand, - can_play=can_play, - children_media_class=media_class["children"], - media_class=media_class["parent"], - media_content_id=media_id, - media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}", - title=item.get("name", "Unknown"), - ) - - if "images" in item: - browse_media.thumbnail = fetch_image_url(item) - elif MEDIA_TYPE_ALBUM in item: - browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM]) - - return browse_media - - -def library_payload(*, can_play_artist: bool) -> BrowseMedia: - """ - Create response payload to describe contents of a specific library. - - Used by async_browse_media. - """ - browse_media = BrowseMedia( - can_expand=True, - can_play=False, - children_media_class=MEDIA_CLASS_DIRECTORY, - media_class=MEDIA_CLASS_DIRECTORY, - media_content_id="library", - media_content_type=f"{MEDIA_PLAYER_PREFIX}library", - title="Media Library", - ) - - browse_media.children = [] - for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]: - browse_media.children.append( - item_payload( - {"name": item["name"], "type": item["type"], "uri": item["type"]}, - can_play_artist=can_play_artist, - ) - ) - return browse_media - - -def fetch_image_url(item: dict[str, Any], key="images") -> str | None: - """Fetch image url.""" - try: - return item.get(key, [])[0].get("url") - except IndexError: - return None diff --git a/homeassistant/components/spotify/util.py b/homeassistant/components/spotify/util.py new file mode 100644 index 00000000000..cdb8e933523 --- /dev/null +++ b/homeassistant/components/spotify/util.py @@ -0,0 +1,24 @@ +"""Utils for Spotify.""" +from __future__ import annotations + +from typing import Any + +from .const import MEDIA_PLAYER_PREFIX + + +def is_spotify_media_type(media_content_type: str) -> bool: + """Return whether the media_content_type is a valid Spotify media_id.""" + return media_content_type.startswith(MEDIA_PLAYER_PREFIX) + + +def resolve_spotify_media_type(media_content_type: str) -> str: + """Return actual spotify media_content_type.""" + return media_content_type[len(MEDIA_PLAYER_PREFIX) :] + + +def fetch_image_url(item: dict[str, Any], key="images") -> str | None: + """Fetch image url.""" + try: + return item.get(key, [])[0].get("url") + except IndexError: + return None