Enable strict typing for the Squeezebox integration (#125161)

* Strict typing for squeezebox

* Improve unit tests

* Refactor tests to use websockets and services.async_call

* Apply suggestions from code review

* Fix merge conflict
This commit is contained in:
Raj Laud 2024-09-03 13:19:30 -04:00 committed by GitHub
parent 00533bae4b
commit 8f26cff65a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 106 additions and 60 deletions

View File

@ -416,6 +416,7 @@ homeassistant.components.solarlog.*
homeassistant.components.sonarr.* homeassistant.components.sonarr.*
homeassistant.components.speedtestdotnet.* homeassistant.components.speedtestdotnet.*
homeassistant.components.sql.* homeassistant.components.sql.*
homeassistant.components.squeezebox.*
homeassistant.components.ssdp.* homeassistant.components.ssdp.*
homeassistant.components.starlink.* homeassistant.components.starlink.*
homeassistant.components.statistics.* homeassistant.components.statistics.*

View File

@ -1,14 +1,21 @@
"""Support for media browsing.""" """Support for media browsing."""
from __future__ import annotations
import contextlib import contextlib
from typing import Any
from pysqueezebox import Player
from homeassistant.components import media_source from homeassistant.components import media_source
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
BrowseError, BrowseError,
BrowseMedia, BrowseMedia,
MediaClass, MediaClass,
MediaPlayerEntity,
MediaType, MediaType,
) )
from homeassistant.core import HomeAssistant
from homeassistant.helpers.network import is_internal_request from homeassistant.helpers.network import is_internal_request
LIBRARY = ["Favorites", "Artists", "Albums", "Tracks", "Playlists", "Genres"] LIBRARY = ["Favorites", "Artists", "Albums", "Tracks", "Playlists", "Genres"]
@ -36,7 +43,7 @@ SQUEEZEBOX_ID_BY_TYPE = {
"Favorites": "item_id", "Favorites": "item_id",
} }
CONTENT_TYPE_MEDIA_CLASS = { CONTENT_TYPE_MEDIA_CLASS: dict[str | MediaType, dict[str, MediaClass | None]] = {
"Favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK}, "Favorites": {"item": MediaClass.DIRECTORY, "children": MediaClass.TRACK},
"Artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST}, "Artists": {"item": MediaClass.DIRECTORY, "children": MediaClass.ARTIST},
"Albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM}, "Albums": {"item": MediaClass.DIRECTORY, "children": MediaClass.ALBUM},
@ -66,14 +73,18 @@ CONTENT_TYPE_TO_CHILD_TYPE = {
BROWSE_LIMIT = 1000 BROWSE_LIMIT = 1000
async def build_item_response(entity, player, payload): async def build_item_response(
entity: MediaPlayerEntity, player: Player, payload: dict[str, str | None]
) -> BrowseMedia:
"""Create response payload for search described by payload.""" """Create response payload for search described by payload."""
internal_request = is_internal_request(entity.hass) internal_request = is_internal_request(entity.hass)
search_id = payload["search_id"] search_id = payload["search_id"]
search_type = payload["search_type"] search_type = payload["search_type"]
assert (
search_type is not None
) # async_browse_media will not call this function if search_type is None
media_class = CONTENT_TYPE_MEDIA_CLASS[search_type] media_class = CONTENT_TYPE_MEDIA_CLASS[search_type]
children = None children = None
@ -95,9 +106,9 @@ async def build_item_response(entity, player, payload):
children = [] children = []
for item in result["items"]: for item in result["items"]:
item_id = str(item["id"]) item_id = str(item["id"])
item_thumbnail = None item_thumbnail: str | None = None
if item_type: if item_type:
child_item_type = item_type child_item_type: MediaType | str = item_type
child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type] child_media_class = CONTENT_TYPE_MEDIA_CLASS[item_type]
can_expand = child_media_class["children"] is not None can_expand = child_media_class["children"] is not None
can_play = True can_play = True
@ -120,7 +131,7 @@ async def build_item_response(entity, player, payload):
can_expand = False can_expand = False
can_play = True can_play = True
if artwork_track_id := item.get("artwork_track_id"): if artwork_track_id := item.get("artwork_track_id") and item_type:
if internal_request: if internal_request:
item_thumbnail = player.generate_image_url_from_track_id( item_thumbnail = player.generate_image_url_from_track_id(
artwork_track_id artwork_track_id
@ -132,6 +143,7 @@ async def build_item_response(entity, player, payload):
else: else:
item_thumbnail = item.get("image_url") # will not be proxied by HA item_thumbnail = item.get("image_url") # will not be proxied by HA
assert child_media_class["item"] is not None
children.append( children.append(
BrowseMedia( BrowseMedia(
title=item["title"], title=item["title"],
@ -147,6 +159,9 @@ async def build_item_response(entity, player, payload):
if children is None: if children is None:
raise BrowseError(f"Media not found: {search_type} / {search_id}") raise BrowseError(f"Media not found: {search_type} / {search_id}")
assert media_class["item"] is not None
if not search_id:
search_id = search_type
return BrowseMedia( return BrowseMedia(
title=result.get("title"), title=result.get("title"),
media_class=media_class["item"], media_class=media_class["item"],
@ -159,9 +174,9 @@ async def build_item_response(entity, player, payload):
) )
async def library_payload(hass, player): async def library_payload(hass: HomeAssistant, player: Player) -> BrowseMedia:
"""Create response payload to describe contents of library.""" """Create response payload to describe contents of library."""
library_info = { library_info: dict[str, Any] = {
"title": "Music Library", "title": "Music Library",
"media_class": MediaClass.DIRECTORY, "media_class": MediaClass.DIRECTORY,
"media_content_id": "library", "media_content_id": "library",
@ -179,6 +194,7 @@ async def library_payload(hass, player):
limit=1, limit=1,
) )
if result is not None and result.get("items") is not None: if result is not None and result.get("items") is not None:
assert media_class["children"] is not None
library_info["children"].append( library_info["children"].append(
BrowseMedia( BrowseMedia(
title=item, title=item,
@ -191,14 +207,14 @@ async def library_payload(hass, player):
) )
with contextlib.suppress(media_source.BrowseError): with contextlib.suppress(media_source.BrowseError):
item = await media_source.async_browse_media( browse = await media_source.async_browse_media(
hass, None, content_filter=media_source_content_filter hass, None, content_filter=media_source_content_filter
) )
# If domain is None, it's overview of available sources # If domain is None, it's overview of available sources
if item.domain is None: if browse.domain is None:
library_info["children"].extend(item.children) library_info["children"].extend(browse.children)
else: else:
library_info["children"].append(item) library_info["children"].append(browse)
return BrowseMedia(**library_info) return BrowseMedia(**library_info)
@ -208,7 +224,7 @@ def media_source_content_filter(item: BrowseMedia) -> bool:
return item.media_content_type.startswith("audio/") return item.media_content_type.startswith("audio/")
async def generate_playlist(player, payload): async def generate_playlist(player: Player, payload: dict[str, str]) -> list | None:
"""Generate playlist from browsing payload.""" """Generate playlist from browsing payload."""
media_type = payload["search_type"] media_type = payload["search_type"]
media_id = payload["search_id"] media_id = payload["search_id"]
@ -221,5 +237,6 @@ async def generate_playlist(player, payload):
"titles", limit=BROWSE_LIMIT, browse_id=browse_id "titles", limit=BROWSE_LIMIT, browse_id=browse_id
) )
if result and "items" in result: if result and "items" in result:
return result["items"] items: list = result["items"]
return items
raise BrowseError(f"Media not found: {media_type} / {media_id}") raise BrowseError(f"Media not found: {media_type} / {media_id}")

View File

@ -1,5 +1,7 @@
"""Config flow for Squeezebox integration.""" """Config flow for Squeezebox integration."""
from __future__ import annotations
import asyncio import asyncio
from http import HTTPStatus from http import HTTPStatus
import logging import logging
@ -24,9 +26,11 @@ _LOGGER = logging.getLogger(__name__)
TIMEOUT = 5 TIMEOUT = 5
def _base_schema(discovery_info=None): def _base_schema(
discovery_info: dict[str, Any] | None = None,
) -> vol.Schema:
"""Generate base schema.""" """Generate base schema."""
base_schema = {} base_schema: dict[Any, Any] = {}
if discovery_info and CONF_HOST in discovery_info: if discovery_info and CONF_HOST in discovery_info:
base_schema.update( base_schema.update(
{ {
@ -71,14 +75,14 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize an instance of the squeezebox config flow.""" """Initialize an instance of the squeezebox config flow."""
self.data_schema = _base_schema() self.data_schema = _base_schema()
self.discovery_info = None self.discovery_info: dict[str, Any] | None = None
async def _discover(self, uuid=None): async def _discover(self, uuid: str | None = None) -> None:
"""Discover an unconfigured LMS server.""" """Discover an unconfigured LMS server."""
self.discovery_info = None self.discovery_info = None
discovery_event = asyncio.Event() discovery_event = asyncio.Event()
def _discovery_callback(server): def _discovery_callback(server: Server) -> None:
if server.uuid: if server.uuid:
# ignore already configured uuids # ignore already configured uuids
for entry in self._async_current_entries(): for entry in self._async_current_entries():
@ -156,7 +160,9 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors, errors=errors,
) )
async def async_step_edit(self, user_input=None): async def async_step_edit(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Edit a discovered or manually inputted server.""" """Edit a discovered or manually inputted server."""
errors = {} errors = {}
if user_input: if user_input:
@ -171,7 +177,9 @@ class SqueezeboxConfigFlow(ConfigFlow, domain=DOMAIN):
step_id="edit", data_schema=self.data_schema, errors=errors step_id="edit", data_schema=self.data_schema, errors=errors
) )
async def async_step_integration_discovery(self, discovery_info): async def async_step_integration_discovery(
self, discovery_info: dict[str, Any]
) -> ConfigFlowResult:
"""Handle discovery of a server.""" """Handle discovery of a server."""
_LOGGER.debug("Reached server discovery flow with info: %s", discovery_info) _LOGGER.debug("Reached server discovery flow with info: %s", discovery_info)
if "uuid" in discovery_info: if "uuid" in discovery_info:

View File

@ -8,12 +8,13 @@ import json
import logging import logging
from typing import Any from typing import Any
from pysqueezebox import Player, async_discover from pysqueezebox import Player, Server, async_discover
import voluptuous as vol import voluptuous as vol
from homeassistant.components import media_source from homeassistant.components import media_source
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
ATTR_MEDIA_ENQUEUE, ATTR_MEDIA_ENQUEUE,
BrowseMedia,
MediaPlayerEnqueue, MediaPlayerEnqueue,
MediaPlayerEntity, MediaPlayerEntity,
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
@ -87,7 +88,7 @@ SQUEEZEBOX_MODE = {
async def start_server_discovery(hass: HomeAssistant) -> None: async def start_server_discovery(hass: HomeAssistant) -> None:
"""Start a server discovery task.""" """Start a server discovery task."""
def _discovered_server(server): def _discovered_server(server: Server) -> None:
discovery_flow.async_create_flow( discovery_flow.async_create_flow(
hass, hass,
DOMAIN, DOMAIN,
@ -118,10 +119,10 @@ async def async_setup_entry(
known_players = hass.data[DOMAIN].setdefault(KNOWN_PLAYERS, []) known_players = hass.data[DOMAIN].setdefault(KNOWN_PLAYERS, [])
lms = entry.runtime_data lms = entry.runtime_data
async def _player_discovery(now=None): async def _player_discovery(now: datetime | None = None) -> None:
"""Discover squeezebox players by polling server.""" """Discover squeezebox players by polling server."""
async def _discovered_player(player): async def _discovered_player(player: Player) -> None:
"""Handle a (re)discovered player.""" """Handle a (re)discovered player."""
entity = next( entity = next(
( (
@ -234,7 +235,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
) )
@property @property
def extra_state_attributes(self): def extra_state_attributes(self) -> dict[str, Any]:
"""Return device-specific attributes.""" """Return device-specific attributes."""
return { return {
attr: getattr(self, attr) attr: getattr(self, attr)
@ -243,12 +244,13 @@ class SqueezeBoxEntity(MediaPlayerEntity):
} }
@callback @callback
def rediscovered(self, unique_id, connected): def rediscovered(self, unique_id: str, connected: bool) -> None:
"""Make a player available again.""" """Make a player available again."""
if unique_id == self.unique_id and connected: if unique_id == self.unique_id and connected:
self._attr_available = True self._attr_available = True
_LOGGER.debug("Player %s is available again", self.name) _LOGGER.debug("Player %s is available again", self.name)
self._remove_dispatcher() if self._remove_dispatcher:
self._remove_dispatcher()
@property @property
def state(self) -> MediaPlayerState | None: def state(self) -> MediaPlayerState | None:
@ -288,22 +290,22 @@ class SqueezeBoxEntity(MediaPlayerEntity):
return None return None
@property @property
def is_volume_muted(self): def is_volume_muted(self) -> bool:
"""Return true if volume is muted.""" """Return true if volume is muted."""
return self._player.muting return bool(self._player.muting)
@property @property
def media_content_id(self): def media_content_id(self) -> str | None:
"""Content ID of current playing media.""" """Content ID of current playing media."""
if not self._player.playlist: if not self._player.playlist:
return None return None
if len(self._player.playlist) > 1: if len(self._player.playlist) > 1:
urls = [{"url": track["url"]} for track in self._player.playlist] urls = [{"url": track["url"]} for track in self._player.playlist]
return json.dumps({"index": self._player.current_index, "urls": urls}) return json.dumps({"index": self._player.current_index, "urls": urls})
return self._player.url return str(self._player.url)
@property @property
def media_content_type(self): def media_content_type(self) -> MediaType | None:
"""Content type of current playing media.""" """Content type of current playing media."""
if not self._player.playlist: if not self._player.playlist:
return None return None
@ -312,47 +314,47 @@ class SqueezeBoxEntity(MediaPlayerEntity):
return MediaType.MUSIC return MediaType.MUSIC
@property @property
def media_duration(self): def media_duration(self) -> int | None:
"""Duration of current playing media in seconds.""" """Duration of current playing media in seconds."""
return self._player.duration return int(self._player.duration)
@property @property
def media_position(self): def media_position(self) -> int | None:
"""Position of current playing media in seconds.""" """Position of current playing media in seconds."""
return self._player.time return int(self._player.time)
@property @property
def media_position_updated_at(self): def media_position_updated_at(self) -> datetime | None:
"""Last time status was updated.""" """Last time status was updated."""
return self._last_update return self._last_update
@property @property
def media_image_url(self): def media_image_url(self) -> str | None:
"""Image url of current playing media.""" """Image url of current playing media."""
return self._player.image_url return str(self._player.image_url)
@property @property
def media_title(self): def media_title(self) -> str | None:
"""Title of current playing media.""" """Title of current playing media."""
return self._player.title return str(self._player.title)
@property @property
def media_channel(self): def media_channel(self) -> str | None:
"""Channel (e.g. webradio name) of current playing media.""" """Channel (e.g. webradio name) of current playing media."""
return self._player.remote_title return str(self._player.remote_title)
@property @property
def media_artist(self): def media_artist(self) -> str | None:
"""Artist of current playing media.""" """Artist of current playing media."""
return self._player.artist return str(self._player.artist)
@property @property
def media_album_name(self): def media_album_name(self) -> str | None:
"""Album of current playing media.""" """Album of current playing media."""
return self._player.album return str(self._player.album)
@property @property
def repeat(self): def repeat(self) -> RepeatMode:
"""Repeat setting.""" """Repeat setting."""
if self._player.repeat == "song": if self._player.repeat == "song":
return RepeatMode.ONE return RepeatMode.ONE
@ -361,13 +363,13 @@ class SqueezeBoxEntity(MediaPlayerEntity):
return RepeatMode.OFF return RepeatMode.OFF
@property @property
def shuffle(self): def shuffle(self) -> bool:
"""Boolean if shuffle is enabled.""" """Boolean if shuffle is enabled."""
# Squeezebox has a third shuffle mode (album) not recognized by Home Assistant # Squeezebox has a third shuffle mode (album) not recognized by Home Assistant
return self._player.shuffle == "song" return bool(self._player.shuffle == "song")
@property @property
def group_members(self): def group_members(self) -> list[str]:
"""List players we are synced with.""" """List players we are synced with."""
player_ids = { player_ids = {
p.unique_id: p.entity_id for p in self.hass.data[DOMAIN][KNOWN_PLAYERS] p.unique_id: p.entity_id for p in self.hass.data[DOMAIN][KNOWN_PLAYERS]
@ -379,12 +381,12 @@ class SqueezeBoxEntity(MediaPlayerEntity):
] ]
@property @property
def sync_group(self): def sync_group(self) -> list[str]:
"""List players we are synced with. Deprecated.""" """List players we are synced with. Deprecated."""
return self.group_members return self.group_members
@property @property
def query_result(self): def query_result(self) -> dict | bool:
"""Return the result from the call_query service.""" """Return the result from the call_query service."""
return self._query_result return self._query_result
@ -477,7 +479,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
try: try:
# a saved playlist by number # a saved playlist by number
payload = { payload = {
"search_id": int(media_id), "search_id": media_id,
"search_type": MediaType.PLAYLIST, "search_type": MediaType.PLAYLIST,
} }
playlist = await generate_playlist(self._player, payload) playlist = await generate_playlist(self._player, payload)
@ -519,7 +521,9 @@ class SqueezeBoxEntity(MediaPlayerEntity):
"""Send the media player the command for clear playlist.""" """Send the media player the command for clear playlist."""
await self._player.async_clear_playlist() await self._player.async_clear_playlist()
async def async_call_method(self, command, parameters=None): async def async_call_method(
self, command: str, parameters: list[str] | None = None
) -> None:
"""Call Squeezebox JSON/RPC method. """Call Squeezebox JSON/RPC method.
Additional parameters are added to the command to form the list of Additional parameters are added to the command to form the list of
@ -530,7 +534,9 @@ class SqueezeBoxEntity(MediaPlayerEntity):
all_params.extend(parameters) all_params.extend(parameters)
await self._player.async_query(*all_params) await self._player.async_query(*all_params)
async def async_call_query(self, command, parameters=None): async def async_call_query(
self, command: str, parameters: list[str] | None = None
) -> None:
"""Call Squeezebox JSON/RPC method where we care about the result. """Call Squeezebox JSON/RPC method where we care about the result.
Additional parameters are added to the command to form the list of Additional parameters are added to the command to form the list of
@ -560,7 +566,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
"Could not find player_id for %s. Not syncing", other_player "Could not find player_id for %s. Not syncing", other_player
) )
async def async_sync(self, other_player): async def async_sync(self, other_player: str) -> None:
"""Sync this Squeezebox player to another. Deprecated.""" """Sync this Squeezebox player to another. Deprecated."""
_LOGGER.warning( _LOGGER.warning(
"Service squeezebox.sync is deprecated; use media_player.join_players" "Service squeezebox.sync is deprecated; use media_player.join_players"
@ -572,7 +578,7 @@ class SqueezeBoxEntity(MediaPlayerEntity):
"""Unsync this Squeezebox player.""" """Unsync this Squeezebox player."""
await self._player.async_unsync() await self._player.async_unsync()
async def async_unsync(self): async def async_unsync(self) -> None:
"""Unsync this Squeezebox player. Deprecated.""" """Unsync this Squeezebox player. Deprecated."""
_LOGGER.warning( _LOGGER.warning(
"Service squeezebox.unsync is deprecated; use media_player.unjoin_player" "Service squeezebox.unsync is deprecated; use media_player.unjoin_player"
@ -580,7 +586,11 @@ class SqueezeBoxEntity(MediaPlayerEntity):
) )
await self.async_unjoin_player() await self.async_unjoin_player()
async def async_browse_media(self, media_content_type=None, media_content_id=None): async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Implement the websocket media browsing helper.""" """Implement the websocket media browsing helper."""
_LOGGER.debug( _LOGGER.debug(
"Reached async_browse_media with content_type %s and content_id %s", "Reached async_browse_media with content_type %s and content_id %s",

View File

@ -3916,6 +3916,16 @@ disallow_untyped_defs = true
warn_return_any = true warn_return_any = true
warn_unreachable = true warn_unreachable = true
[mypy-homeassistant.components.squeezebox.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.ssdp.*] [mypy-homeassistant.components.ssdp.*]
check_untyped_defs = true check_untyped_defs = true
disallow_incomplete_defs = true disallow_incomplete_defs = true