diff --git a/.strict-typing b/.strict-typing index 797a1b51293..1d73b05fdea 100644 --- a/.strict-typing +++ b/.strict-typing @@ -416,6 +416,7 @@ homeassistant.components.solarlog.* homeassistant.components.sonarr.* homeassistant.components.speedtestdotnet.* homeassistant.components.sql.* +homeassistant.components.squeezebox.* homeassistant.components.ssdp.* homeassistant.components.starlink.* homeassistant.components.statistics.* diff --git a/homeassistant/components/squeezebox/browse_media.py b/homeassistant/components/squeezebox/browse_media.py index f68624f8f06..61ae7b7a403 100644 --- a/homeassistant/components/squeezebox/browse_media.py +++ b/homeassistant/components/squeezebox/browse_media.py @@ -1,14 +1,21 @@ """Support for media browsing.""" +from __future__ import annotations + import contextlib +from typing import Any + +from pysqueezebox import Player from homeassistant.components import media_source from homeassistant.components.media_player import ( BrowseError, BrowseMedia, MediaClass, + MediaPlayerEntity, MediaType, ) +from homeassistant.core import HomeAssistant from homeassistant.helpers.network import is_internal_request LIBRARY = ["Favorites", "Artists", "Albums", "Tracks", "Playlists", "Genres"] @@ -36,7 +43,7 @@ SQUEEZEBOX_ID_BY_TYPE = { "Favorites": "item_id", } -CONTENT_TYPE_MEDIA_CLASS = { +CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | None]] = { "Favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK}, "Artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST}, "Albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM}, @@ -66,14 +73,18 @@ CONTENT_TYPE_TO_CHILD_TYPE = { BROWSE_LIMIT = 1000 -async def build_item_response(entity, player, payload): +async def build_item_response( + entity: MediaPlayerEntity, player: Player, payload: dict[str, str | None] +) -> BrowseMedia: """Create response payload for search described by payload.""" internal_request = is_internal_request(entity.hass) search_id = payload["search_id"] search_type = payload["search_type"] - + assert ( + search_type is not None + ) # async_browse_media will not call this function if search_type is None media_class = CONTENT_TYPE_MEDIA_CLASS[search_type] children = None @@ -95,9 +106,9 @@ async def build_item_response(entity, player, payload): children = [] for item in result["items"]: item_id = str(item["id"]) - item_thumbnail = None + item_thumbnail: str | None = None if item_type: - child_item_type = item_type + child_item_type: MediaType | str = item_type child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type] can_expand = child_media_class["children"] is not None can_play = True @@ -120,7 +131,7 @@ async def build_item_response(entity, player, payload): can_expand = False can_play = True - if artwork_track_id := item.get("artwork_track_id"): + if artwork_track_id := item.get("artwork_track_id") and item_type: if internal_request: item_thumbnail = player.generate_image_url_from_track_id( artwork_track_id @@ -132,6 +143,7 @@ async def build_item_response(entity, player, payload): else: item_thumbnail = item.get("image_url") # will not be proxied by HA + assert child_media_class["item"] is not None children.append( BrowseMedia( title=item["title"], @@ -147,6 +159,9 @@ async def build_item_response(entity, player, payload): if children is None: raise BrowseError(f"Media not found: {search_type} / {search_id}") + assert media_class["item"] is not None + if not search_id: + search_id = search_type return BrowseMedia( title=result.get("title"), media_class=media_class["item"], @@ -159,9 +174,9 @@ async def build_item_response(entity, player, payload): ) -async def library_payload(hass, player): +async def library_payload(hass: HomeAssistant, player: Player) -> BrowseMedia: """Create response payload to describe contents of library.""" - library_info = { + library_info: dict[str, Any] = { "title": "Music Library", "media_class": MediaClass.DIRECTORY, "media_content_id": "library", @@ -179,6 +194,7 @@ async def library_payload(hass, player): limit=1, ) if result is not None and result.get("items") is not None: + assert media_class["children"] is not None library_info["children"].append( BrowseMedia( title=item, @@ -191,14 +207,14 @@ async def library_payload(hass, player): ) with contextlib.suppress(media_source.BrowseError): - item = await media_source.async_browse_media( + browse = await media_source.async_browse_media( hass, None, content_filter=media_source_content_filter ) # If domain is None, it's overview of available sources - if item.domain is None: - library_info["children"].extend(item.children) + if browse.domain is None: + library_info["children"].extend(browse.children) else: - library_info["children"].append(item) + library_info["children"].append(browse) return BrowseMedia(**library_info) @@ -208,7 +224,7 @@ def media_source_content_filter(item: BrowseMedia) -> bool: return item.media_content_type.startswith("audio/") -async def generate_playlist(player, payload): +async def generate_playlist(player: Player, payload: dict[str, str]) -> list | None: """Generate playlist from browsing payload.""" media_type = payload["search_type"] media_id = payload["search_id"] @@ -221,5 +237,6 @@ async def generate_playlist(player, payload): "titles", limit=BROWSE_LIMIT, browse_id=browse_id ) if result and "items" in result: - return result["items"] + items: list = result["items"] + return items raise BrowseError(f"Media not found: {media_type} / {media_id}") diff --git a/homeassistant/components/squeezebox/config_flow.py b/homeassistant/components/squeezebox/config_flow.py index fe57b12516a..c372c7262d4 100644 --- a/homeassistant/components/squeezebox/config_flow.py +++ b/homeassistant/components/squeezebox/config_flow.py @@ -1,5 +1,7 @@ """Config flow for Squeezebox integration.""" +from __future__ import annotations + import asyncio from http import HTTPStatus import logging @@ -24,9 +26,11 @@ _LOGGER = logging.getLogger(__name__) TIMEOUT = 5 -def _base_schema(discovery_info=None): +def _base_schema( + discovery_info: dict[str, Any] | None = None, +) -> vol.Schema: """Generate base schema.""" - base_schema = {} + base_schema: dict[Any, Any] = {} if discovery_info and CONF_HOST in discovery_info: base_schema.update( { @@ -71,14 +75,14 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN): def __init__(self) -> None: """Initialize an instance of the squeezebox config flow.""" self.data_schema = _base_schema() - self.discovery_info = None + self.discovery_info: dict[str, Any] | None = None - async def _discover(self, uuid=None): + async def _discover(self, uuid: str | None = None) -> None: """Discover an unconfigured LMS server.""" self.discovery_info = None discovery_event = asyncio.Event() - def _discovery_callback(server): + def _discovery_callback(server: Server) -> None: if server.uuid: # ignore already configured uuids for entry in self._async_current_entries(): @@ -156,7 +160,9 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN): errors=errors, ) - async def async_step_edit(self, user_input=None): + async def async_step_edit( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: """Edit a discovered or manually inputted server.""" errors = {} if user_input: @@ -171,7 +177,9 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN): step_id="edit", data_schema=self.data_schema, errors=errors ) - async def async_step_integration_discovery(self, discovery_info): + async def async_step_integration_discovery( + self, discovery_info: dict[str, Any] + ) -> ConfigFlowResult: """Handle discovery of a server.""" _LOGGER.debug("Reached server discovery flow with info: %s", discovery_info) if "uuid" in discovery_info: diff --git a/homeassistant/components/squeezebox/media_player.py b/homeassistant/components/squeezebox/media_player.py index 279e51485f0..5fa132533d1 100644 --- a/homeassistant/components/squeezebox/media_player.py +++ b/homeassistant/components/squeezebox/media_player.py @@ -8,12 +8,13 @@ import json import logging from typing import Any -from pysqueezebox import Player, async_discover +from pysqueezebox import Player, Server, async_discover import voluptuous as vol from homeassistant.components import media_source from homeassistant.components.media_player import ( ATTR_MEDIA_ENQUEUE, + BrowseMedia, MediaPlayerEnqueue, MediaPlayerEntity, MediaPlayerEntityFeature, @@ -87,7 +88,7 @@ SQUEEZEBOX_MODE = { async def start_server_discovery(hass: HomeAssistant) -> None: """Start a server discovery task.""" - def _discovered_server(server): + def _discovered_server(server: Server) -> None: discovery_flow.async_create_flow( hass, DOMAIN, @@ -118,10 +119,10 @@ async def async_setup_entry( known_players = hass.data[DOMAIN].setdefault(KNOWN_PLAYERS, []) lms = entry.runtime_data - async def _player_discovery(now=None): + async def _player_discovery(now: datetime | None = None) -> None: """Discover squeezebox players by polling server.""" - async def _discovered_player(player): + async def _discovered_player(player: Player) -> None: """Handle a (re)discovered player.""" entity = next( ( @@ -234,7 +235,7 @@ class SqueezeBoxEntity(MediaPlayerEntity): ) @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> dict[str, Any]: """Return device-specific attributes.""" return { attr: getattr(self, attr) @@ -243,12 +244,13 @@ class SqueezeBoxEntity(MediaPlayerEntity): } @callback - def rediscovered(self, unique_id, connected): + def rediscovered(self, unique_id: str, connected: bool) -> None: """Make a player available again.""" if unique_id == self.unique_id and connected: self._attr_available = True _LOGGER.debug("Player %s is available again", self.name) - self._remove_dispatcher() + if self._remove_dispatcher: + self._remove_dispatcher() @property def state(self) -> MediaPlayerState | None: @@ -288,22 +290,22 @@ class SqueezeBoxEntity(MediaPlayerEntity): return None @property - def is_volume_muted(self): + def is_volume_muted(self) -> bool: """Return true if volume is muted.""" - return self._player.muting + return bool(self._player.muting) @property - def media_content_id(self): + def media_content_id(self) -> str | None: """Content ID of current playing media.""" if not self._player.playlist: return None if len(self._player.playlist) > 1: urls = [{"url": track["url"]} for track in self._player.playlist] return json.dumps({"index": self._player.current_index, "urls": urls}) - return self._player.url + return str(self._player.url) @property - def media_content_type(self): + def media_content_type(self) -> MediaType | None: """Content type of current playing media.""" if not self._player.playlist: return None @@ -312,47 +314,47 @@ class SqueezeBoxEntity(MediaPlayerEntity): return MediaType.MUSIC @property - def media_duration(self): + def media_duration(self) -> int | None: """Duration of current playing media in seconds.""" - return self._player.duration + return int(self._player.duration) @property - def media_position(self): + def media_position(self) -> int | None: """Position of current playing media in seconds.""" - return self._player.time + return int(self._player.time) @property - def media_position_updated_at(self): + def media_position_updated_at(self) -> datetime | None: """Last time status was updated.""" return self._last_update @property - def media_image_url(self): + def media_image_url(self) -> str | None: """Image url of current playing media.""" - return self._player.image_url + return str(self._player.image_url) @property - def media_title(self): + def media_title(self) -> str | None: """Title of current playing media.""" - return self._player.title + return str(self._player.title) @property - def media_channel(self): + def media_channel(self) -> str | None: """Channel (e.g. webradio name) of current playing media.""" - return self._player.remote_title + return str(self._player.remote_title) @property - def media_artist(self): + def media_artist(self) -> str | None: """Artist of current playing media.""" - return self._player.artist + return str(self._player.artist) @property - def media_album_name(self): + def media_album_name(self) -> str | None: """Album of current playing media.""" - return self._player.album + return str(self._player.album) @property - def repeat(self): + def repeat(self) -> RepeatMode: """Repeat setting.""" if self._player.repeat == "song": return RepeatMode.ONE @@ -361,13 +363,13 @@ class SqueezeBoxEntity(MediaPlayerEntity): return RepeatMode.OFF @property - def shuffle(self): + def shuffle(self) -> bool: """Boolean if shuffle is enabled.""" # Squeezebox has a third shuffle mode (album) not recognized by Home Assistant - return self._player.shuffle == "song" + return bool(self._player.shuffle == "song") @property - def group_members(self): + def group_members(self) -> list[str]: """List players we are synced with.""" player_ids = { p.unique_id: p.entity_id for p in self.hass.data[DOMAIN][KNOWN_PLAYERS] @@ -379,12 +381,12 @@ class SqueezeBoxEntity(MediaPlayerEntity): ] @property - def sync_group(self): + def sync_group(self) -> list[str]: """List players we are synced with. Deprecated.""" return self.group_members @property - def query_result(self): + def query_result(self) -> dict | bool: """Return the result from the call_query service.""" return self._query_result @@ -477,7 +479,7 @@ class SqueezeBoxEntity(MediaPlayerEntity): try: # a saved playlist by number payload = { - "search_id": int(media_id), + "search_id": media_id, "search_type": MediaType.PLAYLIST, } playlist = await generate_playlist(self._player, payload) @@ -519,7 +521,9 @@ class SqueezeBoxEntity(MediaPlayerEntity): """Send the media player the command for clear playlist.""" await self._player.async_clear_playlist() - async def async_call_method(self, command, parameters=None): + async def async_call_method( + self, command: str, parameters: list[str] | None = None + ) -> None: """Call Squeezebox JSON/RPC method. Additional parameters are added to the command to form the list of @@ -530,7 +534,9 @@ class SqueezeBoxEntity(MediaPlayerEntity): all_params.extend(parameters) await self._player.async_query(*all_params) - async def async_call_query(self, command, parameters=None): + async def async_call_query( + self, command: str, parameters: list[str] | None = None + ) -> None: """Call Squeezebox JSON/RPC method where we care about the result. Additional parameters are added to the command to form the list of @@ -560,7 +566,7 @@ class SqueezeBoxEntity(MediaPlayerEntity): "Could not find player_id for %s. Not syncing", other_player ) - async def async_sync(self, other_player): + async def async_sync(self, other_player: str) -> None: """Sync this Squeezebox player to another. Deprecated.""" _LOGGER.warning( "Service squeezebox.sync is deprecated; use media_player.join_players" @@ -572,7 +578,7 @@ class SqueezeBoxEntity(MediaPlayerEntity): """Unsync this Squeezebox player.""" await self._player.async_unsync() - async def async_unsync(self): + async def async_unsync(self) -> None: """Unsync this Squeezebox player. Deprecated.""" _LOGGER.warning( "Service squeezebox.unsync is deprecated; use media_player.unjoin_player" @@ -580,7 +586,11 @@ class SqueezeBoxEntity(MediaPlayerEntity): ) await self.async_unjoin_player() - async def async_browse_media(self, media_content_type=None, media_content_id=None): + async def async_browse_media( + self, + media_content_type: MediaType | str | None = None, + media_content_id: str | None = None, + ) -> BrowseMedia: """Implement the websocket media browsing helper.""" _LOGGER.debug( "Reached async_browse_media with content_type %s and content_id %s", diff --git a/mypy.ini b/mypy.ini index c29db45cd53..4ba1f41f4d4 100644 --- a/mypy.ini +++ b/mypy.ini @@ -3916,6 +3916,16 @@ disallow_untyped_defs = true warn_return_any = true warn_unreachable = true +[mypy-homeassistant.components.squeezebox.*] +check_untyped_defs = true +disallow_incomplete_defs = true +disallow_subclassing_any = true +disallow_untyped_calls = true +disallow_untyped_decorators = true +disallow_untyped_defs = true +warn_return_any = true +warn_unreachable = true + [mypy-homeassistant.components.ssdp.*] check_untyped_defs = true disallow_incomplete_defs = true