diff --git a/homeassistant/components/spotify/__init__.py b/homeassistant/components/spotify/__init__.py index 5c36a0c71c3..a2a1fee50f2 100644 --- a/homeassistant/components/spotify/__init__.py +++ b/homeassistant/components/spotify/__init__.py @@ -4,7 +4,7 @@ import aiohttp from spotipy import Spotify, SpotifyException import voluptuous as vol -from homeassistant.components.media_player import BrowseError +from homeassistant.components.media_player import BrowseError, BrowseMedia from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_CREDENTIALS, @@ -47,19 +47,23 @@ CONFIG_SCHEMA = vol.Schema( PLATFORMS = [Platform.MEDIA_PLAYER] -def is_spotify_media_type(media_content_type): +def is_spotify_media_type(media_content_type: str) -> bool: """Return whether the media_content_type is a valid Spotify media_id.""" return media_content_type.startswith(MEDIA_PLAYER_PREFIX) -def resolve_spotify_media_type(media_content_type): +def resolve_spotify_media_type(media_content_type: str) -> str: """Return actual spotify media_content_type.""" return media_content_type[len(MEDIA_PLAYER_PREFIX) :] async def async_browse_media( - hass, media_content_type, media_content_id, *, can_play_artist=True -): + hass: HomeAssistant, + media_content_type: str, + media_content_id: str, + *, + can_play_artist: bool = True, +) -> BrowseMedia: """Browse Spotify media.""" if not (info := next(iter(hass.data[DOMAIN].values()), None)): raise BrowseError("No Spotify accounts available") @@ -128,12 +132,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Unload Spotify config entry.""" - # Unload entities for this entry/device. - unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) - - # Cleanup - del hass.data[DOMAIN][entry.entry_id] - if not hass.data[DOMAIN]: - del hass.data[DOMAIN] - + if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS): + del hass.data[DOMAIN][entry.entry_id] return unload_ok diff --git a/homeassistant/components/spotify/config_flow.py b/homeassistant/components/spotify/config_flow.py index 33e5e67a244..bd01fa64acb 100644 --- a/homeassistant/components/spotify/config_flow.py +++ b/homeassistant/components/spotify/config_flow.py @@ -8,6 +8,7 @@ from spotipy import Spotify import voluptuous as vol from homeassistant.components import persistent_notification +from homeassistant.config_entries import ConfigEntry from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers import config_entry_oauth2_flow @@ -22,10 +23,7 @@ class SpotifyFlowHandler( DOMAIN = DOMAIN VERSION = 1 - def __init__(self) -> None: - """Instantiate config flow.""" - super().__init__() - self.entry: dict[str, Any] | None = None + reauth_entry: ConfigEntry | None = None @property def logger(self) -> logging.Logger: @@ -48,7 +46,7 @@ class SpotifyFlowHandler( name = data["id"] = current_user["id"] - if self.entry and self.entry["id"] != current_user["id"]: + if self.reauth_entry and self.reauth_entry.data["id"] != current_user["id"]: return self.async_abort(reason="reauth_account_mismatch") if current_user.get("display_name"): @@ -61,8 +59,9 @@ class SpotifyFlowHandler( async def async_step_reauth(self, entry: dict[str, Any]) -> FlowResult: """Perform reauth upon migration of old entries.""" - if entry: - self.entry = entry + self.reauth_entry = self.hass.config_entries.async_get_entry( + self.context["entry_id"] + ) persistent_notification.async_create( self.hass, @@ -77,16 +76,18 @@ class SpotifyFlowHandler( self, user_input: dict[str, Any] | None = None ) -> FlowResult: """Confirm reauth dialog.""" - if user_input is None: + if self.reauth_entry is None: + return self.async_abort(reason="reauth_account_mismatch") + + if user_input is None and self.reauth_entry: return self.async_show_form( step_id="reauth_confirm", - description_placeholders={"account": self.entry["id"]}, + description_placeholders={"account": self.reauth_entry.data["id"]}, data_schema=vol.Schema({}), errors={}, ) persistent_notification.async_dismiss(self.hass, "spotify_reauth") - return await self.async_step_pick_implementation( - user_input={"implementation": self.entry["auth_implementation"]} + user_input={"implementation": self.reauth_entry.data["auth_implementation"]} ) diff --git a/homeassistant/components/spotify/media_player.py b/homeassistant/components/spotify/media_player.py index e279b150883..bdb0ea8b959 100644 --- a/homeassistant/components/spotify/media_player.py +++ b/homeassistant/components/spotify/media_player.py @@ -6,6 +6,7 @@ import datetime as dt from datetime import timedelta from functools import partial import logging +from typing import Any import requests from spotipy import Spotify, SpotifyException @@ -128,57 +129,57 @@ class BrowsableMedia(StrEnum): LIBRARY_MAP = { - BrowsableMedia.CURRENT_USER_PLAYLISTS: "Playlists", - BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: "Artists", - BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: "Albums", - BrowsableMedia.CURRENT_USER_SAVED_TRACKS: "Tracks", - BrowsableMedia.CURRENT_USER_SAVED_SHOWS: "Podcasts", - BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: "Recently played", - BrowsableMedia.CURRENT_USER_TOP_ARTISTS: "Top Artists", - BrowsableMedia.CURRENT_USER_TOP_TRACKS: "Top Tracks", - BrowsableMedia.CATEGORIES: "Categories", - BrowsableMedia.FEATURED_PLAYLISTS: "Featured Playlists", - BrowsableMedia.NEW_RELEASES: "New Releases", + BrowsableMedia.CURRENT_USER_PLAYLISTS.value: "Playlists", + BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: "Artists", + BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: "Albums", + BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: "Tracks", + BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: "Podcasts", + BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: "Recently played", + BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: "Top Artists", + BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: "Top Tracks", + BrowsableMedia.CATEGORIES.value: "Categories", + BrowsableMedia.FEATURED_PLAYLISTS.value: "Featured Playlists", + BrowsableMedia.NEW_RELEASES.value: "New Releases", } -CONTENT_TYPE_MEDIA_CLASS = { - BrowsableMedia.CURRENT_USER_PLAYLISTS: { +CONTENT_TYPE_MEDIA_CLASS: dict[str, Any] = { + BrowsableMedia.CURRENT_USER_PLAYLISTS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_PLAYLIST, }, - BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: { + BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_ARTIST, }, - BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: { + BrowsableMedia.CURRENT_USER_SAVED_ALBUMS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_ALBUM, }, - BrowsableMedia.CURRENT_USER_SAVED_TRACKS: { + BrowsableMedia.CURRENT_USER_SAVED_TRACKS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_TRACK, }, - BrowsableMedia.CURRENT_USER_SAVED_SHOWS: { + BrowsableMedia.CURRENT_USER_SAVED_SHOWS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_PODCAST, }, - BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: { + BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_TRACK, }, - BrowsableMedia.CURRENT_USER_TOP_ARTISTS: { + BrowsableMedia.CURRENT_USER_TOP_ARTISTS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_ARTIST, }, - BrowsableMedia.CURRENT_USER_TOP_TRACKS: { + BrowsableMedia.CURRENT_USER_TOP_TRACKS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_TRACK, }, - BrowsableMedia.FEATURED_PLAYLISTS: { + BrowsableMedia.FEATURED_PLAYLISTS.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_PLAYLIST, }, - BrowsableMedia.CATEGORIES: { + BrowsableMedia.CATEGORIES.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_GENRE, }, @@ -186,7 +187,7 @@ CONTENT_TYPE_MEDIA_CLASS = { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_PLAYLIST, }, - BrowsableMedia.NEW_RELEASES: { + BrowsableMedia.NEW_RELEASES.value: { "parent": MEDIA_CLASS_DIRECTORY, "children": MEDIA_CLASS_ALBUM, }, @@ -276,7 +277,7 @@ class SpotifyMediaPlayer(MediaPlayerEntity): self._attr_unique_id = user_id @property - def _me(self) -> dict: + def _me(self) -> dict[str, Any]: """Return spotify user info.""" return self._spotify_data[DATA_SPOTIFY_ME] @@ -319,23 +320,30 @@ class SpotifyMediaPlayer(MediaPlayerEntity): @property def volume_level(self) -> float | None: """Return the device volume.""" + if not self._currently_playing: + return None return self._currently_playing.get("device", {}).get("volume_percent", 0) / 100 @property def media_content_id(self) -> str | None: """Return the media URL.""" + if not self._currently_playing: + return None item = self._currently_playing.get("item") or {} return item.get("uri") @property def media_duration(self) -> int | None: """Duration of current playing media in seconds.""" - if self._currently_playing.get("item") is None: + if ( + self._currently_playing is None + or self._currently_playing.get("item") is None + ): return None return self._currently_playing["item"]["duration_ms"] / 1000 @property - def media_position(self) -> str | None: + def media_position(self) -> int | None: """Position of current playing media in seconds.""" if not self._currently_playing: return None @@ -352,7 +360,8 @@ class SpotifyMediaPlayer(MediaPlayerEntity): def media_image_url(self) -> str | None: """Return the media image URL.""" if ( - self._currently_playing.get("item") is None + not self._currently_playing + or self._currently_playing.get("item") is None or not self._currently_playing["item"]["album"]["images"] ): return None @@ -361,13 +370,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity): @property def media_title(self) -> str | None: """Return the media title.""" + if not self._currently_playing: + return None item = self._currently_playing.get("item") or {} return item.get("name") @property def media_artist(self) -> str | None: """Return the media artist.""" - if self._currently_playing.get("item") is None: + if not self._currently_playing or self._currently_playing.get("item") is None: return None return ", ".join( artist["name"] for artist in self._currently_playing["item"]["artists"] @@ -376,13 +387,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity): @property def media_album_name(self) -> str | None: """Return the media album.""" - if self._currently_playing.get("item") is None: + if not self._currently_playing or self._currently_playing.get("item") is None: return None return self._currently_playing["item"]["album"]["name"] @property def media_track(self) -> int | None: """Track number of current playing media, music track only.""" + if not self._currently_playing: + return None item = self._currently_playing.get("item") or {} return item.get("track_number") @@ -396,6 +409,8 @@ class SpotifyMediaPlayer(MediaPlayerEntity): @property def source(self) -> str | None: """Return the current playback device.""" + if not self._currently_playing: + return None return self._currently_playing.get("device", {}).get("name") @property @@ -406,14 +421,20 @@ class SpotifyMediaPlayer(MediaPlayerEntity): return [device["name"] for device in self._devices] @property - def shuffle(self) -> bool: + def shuffle(self) -> bool | None: """Shuffling state.""" - return bool(self._currently_playing.get("shuffle_state")) + if not self._currently_playing: + return None + return self._currently_playing.get("shuffle_state") @property def repeat(self) -> str | None: """Return current repeat mode.""" - repeat_state = self._currently_playing.get("repeat_state") + if ( + not self._currently_playing + or (repeat_state := self._currently_playing.get("repeat_state")) is None + ): + return None return REPEAT_MODE_MAPPING_TO_HA.get(repeat_state) @property @@ -473,7 +494,11 @@ class SpotifyMediaPlayer(MediaPlayerEntity): _LOGGER.error("Media type %s is not supported", media_type) return - if not self._currently_playing.get("device") and self._devices: + if ( + self._currently_playing + and not self._currently_playing.get("device") + and self._devices + ): kwargs["device_id"] = self._devices[0].get("id") self._spotify.start_playback(**kwargs) @@ -481,6 +506,9 @@ class SpotifyMediaPlayer(MediaPlayerEntity): @spotify_exception_handler def select_source(self, source: str) -> None: """Select playback device.""" + if not self._devices: + return + for device in self._devices: if device["name"] == source: self._spotify.transfer_playback( @@ -525,7 +553,9 @@ class SpotifyMediaPlayer(MediaPlayerEntity): devices = self._spotify.devices() or {} self._devices = devices.get("devices", []) - async def async_browse_media(self, media_content_type=None, media_content_id=None): + async def async_browse_media( + self, media_content_type: str | None = None, media_content_id: str | None = None + ) -> BrowseMedia: """Implement the websocket media browsing helper.""" if not self._scope_ok: @@ -545,15 +575,15 @@ class SpotifyMediaPlayer(MediaPlayerEntity): async def async_browse_media_internal( - hass, - spotify, - session, - current_user, - media_content_type, - media_content_id, + hass: HomeAssistant, + spotify: Spotify, + session: OAuth2Session, + current_user: dict[str, Any], + media_content_type: str | None, + media_content_id: str | None, *, - can_play_artist=True, -): + can_play_artist: bool = True, +) -> BrowseMedia: """Browse spotify media.""" if media_content_type in (None, f"{MEDIA_PLAYER_PREFIX}library"): return await hass.async_add_executor_job( @@ -563,7 +593,8 @@ async def async_browse_media_internal( await session.async_ensure_token_valid() # Strip prefix - media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :] + if media_content_type: + media_content_type = media_content_type[len(MEDIA_PLAYER_PREFIX) :] payload = { "media_content_type": media_content_type, @@ -583,76 +614,91 @@ async def async_browse_media_internal( return response -def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C901 +def build_item_response( # noqa: C901 + spotify: Spotify, + user: dict[str, Any], + payload: dict[str, str | None], + *, + can_play_artist: bool, +) -> BrowseMedia | None: """Create response payload for the provided media query.""" media_content_type = payload["media_content_type"] media_content_id = payload["media_content_id"] + + if media_content_type is None or media_content_id is None: + return None + title = None image = None + media: dict[str, Any] | None = None + items = [] + if media_content_type == BrowsableMedia.CURRENT_USER_PLAYLISTS: - media = spotify.current_user_playlists(limit=BROWSE_LIMIT) - items = media.get("items", []) + if media := spotify.current_user_playlists(limit=BROWSE_LIMIT): + items = media.get("items", []) elif media_content_type == BrowsableMedia.CURRENT_USER_FOLLOWED_ARTISTS: - media = spotify.current_user_followed_artists(limit=BROWSE_LIMIT) - items = media.get("artists", {}).get("items", []) + if media := spotify.current_user_followed_artists(limit=BROWSE_LIMIT): + items = media.get("artists", {}).get("items", []) elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_ALBUMS: - media = spotify.current_user_saved_albums(limit=BROWSE_LIMIT) - items = [item["album"] for item in media.get("items", [])] + if media := spotify.current_user_saved_albums(limit=BROWSE_LIMIT): + items = [item["album"] for item in media.get("items", [])] elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_TRACKS: - media = spotify.current_user_saved_tracks(limit=BROWSE_LIMIT) - items = [item["track"] for item in media.get("items", [])] + if media := spotify.current_user_saved_tracks(limit=BROWSE_LIMIT): + items = [item["track"] for item in media.get("items", [])] elif media_content_type == BrowsableMedia.CURRENT_USER_SAVED_SHOWS: - media = spotify.current_user_saved_shows(limit=BROWSE_LIMIT) - items = [item["show"] for item in media.get("items", [])] + if media := spotify.current_user_saved_shows(limit=BROWSE_LIMIT): + items = [item["show"] for item in media.get("items", [])] elif media_content_type == BrowsableMedia.CURRENT_USER_RECENTLY_PLAYED: - media = spotify.current_user_recently_played(limit=BROWSE_LIMIT) - items = [item["track"] for item in media.get("items", [])] + if media := spotify.current_user_recently_played(limit=BROWSE_LIMIT): + items = [item["track"] for item in media.get("items", [])] elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_ARTISTS: - media = spotify.current_user_top_artists(limit=BROWSE_LIMIT) - items = media.get("items", []) + if media := spotify.current_user_top_artists(limit=BROWSE_LIMIT): + items = media.get("items", []) elif media_content_type == BrowsableMedia.CURRENT_USER_TOP_TRACKS: - media = spotify.current_user_top_tracks(limit=BROWSE_LIMIT) - items = media.get("items", []) + if media := spotify.current_user_top_tracks(limit=BROWSE_LIMIT): + items = media.get("items", []) elif media_content_type == BrowsableMedia.FEATURED_PLAYLISTS: - media = spotify.featured_playlists(country=user["country"], limit=BROWSE_LIMIT) - items = media.get("playlists", {}).get("items", []) + if media := spotify.featured_playlists( + country=user["country"], limit=BROWSE_LIMIT + ): + items = media.get("playlists", {}).get("items", []) elif media_content_type == BrowsableMedia.CATEGORIES: - media = spotify.categories(country=user["country"], limit=BROWSE_LIMIT) - items = media.get("categories", {}).get("items", []) + if media := spotify.categories(country=user["country"], limit=BROWSE_LIMIT): + items = media.get("categories", {}).get("items", []) elif media_content_type == "category_playlists": - media = spotify.category_playlists( - category_id=media_content_id, - country=user["country"], - limit=BROWSE_LIMIT, - ) - category = spotify.category(media_content_id, country=user["country"]) - title = category.get("name") - image = fetch_image_url(category, key="icons") - items = media.get("playlists", {}).get("items", []) + if ( + media := spotify.category_playlists( + category_id=media_content_id, + country=user["country"], + limit=BROWSE_LIMIT, + ) + ) and (category := spotify.category(media_content_id, country=user["country"])): + title = category.get("name") + image = fetch_image_url(category, key="icons") + items = media.get("playlists", {}).get("items", []) elif media_content_type == BrowsableMedia.NEW_RELEASES: - media = spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT) - items = media.get("albums", {}).get("items", []) + if media := spotify.new_releases(country=user["country"], limit=BROWSE_LIMIT): + items = media.get("albums", {}).get("items", []) elif media_content_type == MEDIA_TYPE_PLAYLIST: - media = spotify.playlist(media_content_id) - items = [item["track"] for item in media.get("tracks", {}).get("items", [])] + if media := spotify.playlist(media_content_id): + items = [item["track"] for item in media.get("tracks", {}).get("items", [])] elif media_content_type == MEDIA_TYPE_ALBUM: - media = spotify.album(media_content_id) - items = media.get("tracks", {}).get("items", []) + if media := spotify.album(media_content_id): + items = media.get("tracks", {}).get("items", []) elif media_content_type == MEDIA_TYPE_ARTIST: - media = spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT) - artist = spotify.artist(media_content_id) - title = artist.get("name") - image = fetch_image_url(artist) - items = media.get("items", []) + if (media := spotify.artist_albums(media_content_id, limit=BROWSE_LIMIT)) and ( + artist := spotify.artist(media_content_id) + ): + title = artist.get("name") + image = fetch_image_url(artist) + items = media.get("items", []) elif media_content_type == MEDIA_TYPE_SHOW: - media = spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT) - show = spotify.show(media_content_id) - title = show.get("name") - image = fetch_image_url(show) - items = media.get("items", []) - else: - media = None - items = [] + if (media := spotify.show_episodes(media_content_id, limit=BROWSE_LIMIT)) and ( + show := spotify.show(media_content_id) + ): + title = show.get("name") + image = fetch_image_url(show) + items = media.get("items", []) if media is None: return None @@ -665,15 +711,16 @@ def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C9 if media_content_type == BrowsableMedia.CATEGORIES: media_item = BrowseMedia( - title=LIBRARY_MAP.get(media_content_id), - media_class=media_class["parent"], - children_media_class=media_class["children"], - media_content_id=media_content_id, - media_content_type=MEDIA_PLAYER_PREFIX + media_content_type, - can_play=False, can_expand=True, - children=[], + can_play=False, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_content_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", + title=LIBRARY_MAP.get(media_content_id, "Unknown"), ) + + media_item.children = [] for item in items: try: item_id = item["id"] @@ -682,52 +729,54 @@ def build_item_response(spotify, user, payload, *, can_play_artist): # noqa: C9 continue media_item.children.append( BrowseMedia( - title=item.get("name"), - media_class=MEDIA_CLASS_PLAYLIST, - children_media_class=MEDIA_CLASS_TRACK, - media_content_id=item_id, - media_content_type=MEDIA_PLAYER_PREFIX + "category_playlists", - thumbnail=fetch_image_url(item, key="icons"), - can_play=False, can_expand=True, + can_play=False, + children_media_class=MEDIA_CLASS_TRACK, + media_class=MEDIA_CLASS_PLAYLIST, + media_content_id=item_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}category_playlists", + thumbnail=fetch_image_url(item, key="icons"), + title=item.get("name"), ) ) return media_item if title is None: + title = LIBRARY_MAP.get(media_content_id, "Unknown") if "name" in media: - title = media.get("name") - else: - title = LIBRARY_MAP.get(payload["media_content_id"]) + title = media["name"] - params = { - "title": title, - "media_class": media_class["parent"], - "children_media_class": media_class["children"], - "media_content_id": media_content_id, - "media_content_type": MEDIA_PLAYER_PREFIX + media_content_type, - "can_play": media_content_type in PLAYABLE_MEDIA_TYPES - and (media_content_type != MEDIA_TYPE_ARTIST or can_play_artist), - "children": [], - "can_expand": True, - } + can_play = media_content_type in PLAYABLE_MEDIA_TYPES and ( + media_content_type != MEDIA_TYPE_ARTIST or can_play_artist + ) + + browse_media = BrowseMedia( + can_expand=True, + can_play=can_play, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_content_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_content_type}", + thumbnail=image, + title=title, + ) + + browse_media.children = [] for item in items: try: - params["children"].append( + browse_media.children.append( item_payload(item, can_play_artist=can_play_artist) ) except (MissingMediaInformation, UnknownMediaType): continue if "images" in media: - params["thumbnail"] = fetch_image_url(media) - elif image: - params["thumbnail"] = image + browse_media.thumbnail = fetch_image_url(media) - return BrowseMedia(**params) + return browse_media -def item_payload(item, *, can_play_artist): +def item_payload(item: dict[str, Any], *, can_play_artist: bool) -> BrowseMedia: """ Create response payload for a single media item. @@ -751,54 +800,56 @@ def item_payload(item, *, can_play_artist): MEDIA_TYPE_EPISODE, ] - payload = { - "title": item.get("name"), - "media_class": media_class["parent"], - "children_media_class": media_class["children"], - "media_content_id": media_id, - "media_content_type": MEDIA_PLAYER_PREFIX + media_type, - "can_play": media_type in PLAYABLE_MEDIA_TYPES - and (media_type != MEDIA_TYPE_ARTIST or can_play_artist), - "can_expand": can_expand, - } + can_play = media_type in PLAYABLE_MEDIA_TYPES and ( + media_type != MEDIA_TYPE_ARTIST or can_play_artist + ) + + browse_media = BrowseMedia( + can_expand=can_expand, + can_play=can_play, + children_media_class=media_class["children"], + media_class=media_class["parent"], + media_content_id=media_id, + media_content_type=f"{MEDIA_PLAYER_PREFIX}{media_type}", + title=item.get("name", "Unknown"), + ) if "images" in item: - payload["thumbnail"] = fetch_image_url(item) + browse_media.thumbnail = fetch_image_url(item) elif MEDIA_TYPE_ALBUM in item: - payload["thumbnail"] = fetch_image_url(item[MEDIA_TYPE_ALBUM]) + browse_media.thumbnail = fetch_image_url(item[MEDIA_TYPE_ALBUM]) - return BrowseMedia(**payload) + return browse_media -def library_payload(*, can_play_artist): +def library_payload(*, can_play_artist: bool) -> BrowseMedia: """ Create response payload to describe contents of a specific library. Used by async_browse_media. """ - library_info = { - "title": "Media Library", - "media_class": MEDIA_CLASS_DIRECTORY, - "media_content_id": "library", - "media_content_type": MEDIA_PLAYER_PREFIX + "library", - "can_play": False, - "can_expand": True, - "children": [], - } + browse_media = BrowseMedia( + can_expand=True, + can_play=False, + children_media_class=MEDIA_CLASS_DIRECTORY, + media_class=MEDIA_CLASS_DIRECTORY, + media_content_id="library", + media_content_type=f"{MEDIA_PLAYER_PREFIX}library", + title="Media Library", + ) + browse_media.children = [] for item in [{"name": n, "type": t} for t, n in LIBRARY_MAP.items()]: - library_info["children"].append( + browse_media.children.append( item_payload( {"name": item["name"], "type": item["type"], "uri": item["type"]}, can_play_artist=can_play_artist, ) ) - response = BrowseMedia(**library_info) - response.children_media_class = MEDIA_CLASS_DIRECTORY - return response + return browse_media -def fetch_image_url(item, key="images"): +def fetch_image_url(item: dict[str, Any], key="images") -> str | None: """Fetch image url.""" try: return item.get(key, [])[0].get("url") diff --git a/homeassistant/components/spotify/strings.json b/homeassistant/components/spotify/strings.json index f775e5df85d..caec5b8a288 100644 --- a/homeassistant/components/spotify/strings.json +++ b/homeassistant/components/spotify/strings.json @@ -11,8 +11,8 @@ }, "abort": { "authorize_url_timeout": "Timeout generating authorize URL.", - "no_url_available": "[%key:common::config_flow::abort::oauth2_no_url_available%]", "missing_configuration": "The Spotify integration is not configured. Please follow the documentation.", + "no_url_available": "[%key:common::config_flow::abort::oauth2_no_url_available%]", "reauth_account_mismatch": "The Spotify account authenticated with, does not match the account needed re-authentication." }, "create_entry": { "default": "Successfully authenticated with Spotify." } diff --git a/mypy.ini b/mypy.ini index 96d58927959..31788f3643d 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2646,12 +2646,6 @@ ignore_errors = true [mypy-homeassistant.components.sonos.statistics] ignore_errors = true -[mypy-homeassistant.components.spotify.config_flow] -ignore_errors = true - -[mypy-homeassistant.components.spotify.media_player] -ignore_errors = true - [mypy-homeassistant.components.system_health] ignore_errors = true diff --git a/script/hassfest/mypy_config.py b/script/hassfest/mypy_config.py index 6b14803b499..5e3d9bd4b11 100644 --- a/script/hassfest/mypy_config.py +++ b/script/hassfest/mypy_config.py @@ -188,8 +188,6 @@ IGNORED_MODULES: Final[list[str]] = [ "homeassistant.components.sonos.sensor", "homeassistant.components.sonos.speaker", "homeassistant.components.sonos.statistics", - "homeassistant.components.spotify.config_flow", - "homeassistant.components.spotify.media_player", "homeassistant.components.system_health", "homeassistant.components.telegram_bot.polling", "homeassistant.components.template", diff --git a/tests/components/spotify/test_config_flow.py b/tests/components/spotify/test_config_flow.py index fb0279f9112..a1a77da1d10 100644 --- a/tests/components/spotify/test_config_flow.py +++ b/tests/components/spotify/test_config_flow.py @@ -194,7 +194,13 @@ async def test_reauthentication( old_entry.add_to_hass(hass) result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": SOURCE_REAUTH}, data=old_entry.data + DOMAIN, + context={ + "source": SOURCE_REAUTH, + "unique_id": old_entry.unique_id, + "entry_id": old_entry.entry_id, + }, + data=old_entry.data, ) flows = hass.config_entries.flow.async_progress() @@ -261,7 +267,13 @@ async def test_reauth_account_mismatch( old_entry.add_to_hass(hass) result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": SOURCE_REAUTH}, data=old_entry.data + DOMAIN, + context={ + "source": SOURCE_REAUTH, + "unique_id": old_entry.unique_id, + "entry_id": old_entry.entry_id, + }, + data=old_entry.data, ) flows = hass.config_entries.flow.async_progress() @@ -294,3 +306,13 @@ async def test_reauth_account_mismatch( assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["reason"] == "reauth_account_mismatch" + + +async def test_abort_if_no_reauth_entry(hass): + """Check flow aborts when no entry is known when entring reauth confirmation.""" + result = await hass.config_entries.flow.async_init( + DOMAIN, context={"source": "reauth_confirm"} + ) + + assert result.get("type") == data_entry_flow.RESULT_TYPE_ABORT + assert result.get("reason") == "reauth_account_mismatch"