Improve type hints in apple_tv media player (#77940)

This commit is contained in:
epenet 2022-09-08 10:46:23 +02:00 committed by GitHub
parent e2568d8375
commit 5276d849ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 100 deletions

View File

@ -64,6 +64,7 @@ omit =
homeassistant/components/anthemav/media_player.py homeassistant/components/anthemav/media_player.py
homeassistant/components/apcupsd/* homeassistant/components/apcupsd/*
homeassistant/components/apple_tv/__init__.py homeassistant/components/apple_tv/__init__.py
homeassistant/components/apple_tv/browse_media.py
homeassistant/components/apple_tv/media_player.py homeassistant/components/apple_tv/media_player.py
homeassistant/components/apple_tv/remote.py homeassistant/components/apple_tv/remote.py
homeassistant/components/aqualogic/* homeassistant/components/aqualogic/*

View File

@ -1,34 +1,29 @@
"""Support for media browsing.""" """Support for media browsing."""
from typing import Any
from homeassistant.components.media_player import BrowseMedia from homeassistant.components.media_player import BrowseMedia, MediaClass, MediaType
from homeassistant.components.media_player.const import (
MEDIA_CLASS_APP,
MEDIA_CLASS_DIRECTORY,
MEDIA_TYPE_APP,
MEDIA_TYPE_APPS,
)
def build_app_list(app_list): def build_app_list(app_list: dict[str, str]) -> BrowseMedia:
"""Create response payload for app list.""" """Create response payload for app list."""
app_list = [ media_list = [
{"app_id": app_id, "title": app_name, "type": MEDIA_TYPE_APP} {"app_id": app_id, "title": app_name, "type": MediaType.APP}
for app_name, app_id in app_list.items() for app_name, app_id in app_list.items()
] ]
return BrowseMedia( return BrowseMedia(
media_class=MEDIA_CLASS_DIRECTORY, media_class=MediaClass.DIRECTORY,
media_content_id="apps", media_content_id="apps",
media_content_type=MEDIA_TYPE_APPS, media_content_type=MediaType.APPS,
title="Apps", title="Apps",
can_play=False, can_play=False,
can_expand=True, can_expand=True,
children=[item_payload(item) for item in app_list], children=[item_payload(item) for item in media_list],
children_media_class=MEDIA_CLASS_APP, children_media_class=MediaClass.APP,
) )
def item_payload(item): def item_payload(item: dict[str, Any]) -> BrowseMedia:
""" """
Create response payload for a single media item. Create response payload for a single media item.
@ -36,8 +31,8 @@ def item_payload(item):
""" """
return BrowseMedia( return BrowseMedia(
title=item["title"], title=item["title"],
media_class=MEDIA_CLASS_APP, media_class=MediaClass.APP,
media_content_type=MEDIA_TYPE_APP, media_content_type=MediaType.APP,
media_content_id=item["app_id"], media_content_id=item["app_id"],
can_play=False, can_play=False,
can_expand=False, can_expand=False,

View File

@ -1,6 +1,7 @@
"""Support for Apple TV media player.""" """Support for Apple TV media player."""
from __future__ import annotations from __future__ import annotations
from datetime import datetime
import logging import logging
from typing import Any from typing import Any
@ -9,45 +10,31 @@ from pyatv.const import (
DeviceState, DeviceState,
FeatureName, FeatureName,
FeatureState, FeatureState,
MediaType, MediaType as AppleMediaType,
PowerState, PowerState,
RepeatState, RepeatState,
ShuffleState, ShuffleState,
) )
from pyatv.helpers import is_streamable from pyatv.helpers import is_streamable
from pyatv.interface import AppleTV, Playing
from homeassistant.components import media_source from homeassistant.components import media_source
from homeassistant.components.media_player import ( from homeassistant.components.media_player import (
BrowseMedia, BrowseMedia,
MediaPlayerEntity, MediaPlayerEntity,
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
) MediaPlayerState,
from homeassistant.components.media_player.browse_media import ( MediaType,
RepeatMode,
async_process_play_media_url, async_process_play_media_url,
) )
from homeassistant.components.media_player.const import (
MEDIA_TYPE_APP,
MEDIA_TYPE_MUSIC,
MEDIA_TYPE_TVSHOW,
MEDIA_TYPE_VIDEO,
REPEAT_MODE_ALL,
REPEAT_MODE_OFF,
REPEAT_MODE_ONE,
)
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ( from homeassistant.const import CONF_NAME
CONF_NAME,
STATE_IDLE,
STATE_OFF,
STATE_PAUSED,
STATE_PLAYING,
STATE_STANDBY,
)
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from . import AppleTVEntity from . import AppleTVEntity, AppleTVManager
from .browse_media import build_app_list from .browse_media import build_app_list
from .const import DOMAIN from .const import DOMAIN
@ -108,8 +95,9 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback, async_add_entities: AddEntitiesCallback,
) -> None: ) -> None:
"""Load Apple TV media player based on a config entry.""" """Load Apple TV media player based on a config entry."""
name = config_entry.data[CONF_NAME] name: str = config_entry.data[CONF_NAME]
manager = hass.data[DOMAIN][config_entry.unique_id] assert config_entry.unique_id is not None
manager: AppleTVManager = hass.data[DOMAIN][config_entry.unique_id]
async_add_entities([AppleTvMediaPlayer(name, config_entry.unique_id, manager)]) async_add_entities([AppleTvMediaPlayer(name, config_entry.unique_id, manager)])
@ -118,14 +106,14 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
_attr_supported_features = SUPPORT_APPLE_TV _attr_supported_features = SUPPORT_APPLE_TV
def __init__(self, name, identifier, manager, **kwargs): def __init__(self, name: str, identifier: str, manager: AppleTVManager) -> None:
"""Initialize the Apple TV media player.""" """Initialize the Apple TV media player."""
super().__init__(name, identifier, manager, **kwargs) super().__init__(name, identifier, manager)
self._playing = None self._playing: Playing | None = None
self._app_list = {} self._app_list: dict[str, str] = {}
@callback @callback
def async_device_connected(self, atv): def async_device_connected(self, atv: AppleTV) -> None:
"""Handle when connection is made to device.""" """Handle when connection is made to device."""
# NB: Do not use _is_feature_available here as it only works when playing # NB: Do not use _is_feature_available here as it only works when playing
if self.atv.features.in_state(FeatureState.Available, FeatureName.PushUpdates): if self.atv.features.in_state(FeatureState.Available, FeatureName.PushUpdates):
@ -153,7 +141,7 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
if self.atv.features.in_state(FeatureState.Available, FeatureName.AppList): if self.atv.features.in_state(FeatureState.Available, FeatureName.AppList):
self.hass.create_task(self._update_app_list()) self.hass.create_task(self._update_app_list())
async def _update_app_list(self): async def _update_app_list(self) -> None:
_LOGGER.debug("Updating app list") _LOGGER.debug("Updating app list")
try: try:
apps = await self.atv.apps.app_list() apps = await self.atv.apps.app_list()
@ -165,127 +153,128 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
self._app_list = { self._app_list = {
app.name: app.identifier app.name: app.identifier
for app in sorted(apps, key=lambda app: app.name.lower()) for app in sorted(apps, key=lambda app: app.name.lower())
if app.name is not None
} }
self.async_write_ha_state() self.async_write_ha_state()
@callback @callback
def async_device_disconnected(self): def async_device_disconnected(self) -> None:
"""Handle when connection was lost to device.""" """Handle when connection was lost to device."""
self._attr_supported_features = SUPPORT_APPLE_TV self._attr_supported_features = SUPPORT_APPLE_TV
@property @property
def state(self): def state(self) -> MediaPlayerState | None:
"""Return the state of the device.""" """Return the state of the device."""
if self.manager.is_connecting: if self.manager.is_connecting:
return None return None
if self.atv is None: if self.atv is None:
return STATE_OFF return MediaPlayerState.OFF
if ( if (
self._is_feature_available(FeatureName.PowerState) self._is_feature_available(FeatureName.PowerState)
and self.atv.power.power_state == PowerState.Off and self.atv.power.power_state == PowerState.Off
): ):
return STATE_STANDBY return MediaPlayerState.STANDBY
if self._playing: if self._playing:
state = self._playing.device_state state = self._playing.device_state
if state in (DeviceState.Idle, DeviceState.Loading): if state in (DeviceState.Idle, DeviceState.Loading):
return STATE_IDLE return MediaPlayerState.IDLE
if state == DeviceState.Playing: if state == DeviceState.Playing:
return STATE_PLAYING return MediaPlayerState.PLAYING
if state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped): if state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped):
return STATE_PAUSED return MediaPlayerState.PAUSED
return STATE_STANDBY # Bad or unknown state? return MediaPlayerState.STANDBY # Bad or unknown state?
return None return None
@callback @callback
def playstatus_update(self, _, playing): def playstatus_update(self, _, playing: Playing) -> None:
"""Print what is currently playing when it changes.""" """Print what is currently playing when it changes."""
self._playing = playing self._playing = playing
self.async_write_ha_state() self.async_write_ha_state()
@callback @callback
def playstatus_error(self, _, exception): def playstatus_error(self, _, exception: Exception) -> None:
"""Inform about an error and restart push updates.""" """Inform about an error and restart push updates."""
_LOGGER.warning("A %s error occurred: %s", exception.__class__, exception) _LOGGER.warning("A %s error occurred: %s", exception.__class__, exception)
self._playing = None self._playing = None
self.async_write_ha_state() self.async_write_ha_state()
@callback @callback
def powerstate_update(self, old_state: PowerState, new_state: PowerState): def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None:
"""Update power state when it changes.""" """Update power state when it changes."""
self.async_write_ha_state() self.async_write_ha_state()
@property @property
def app_id(self): def app_id(self) -> str | None:
"""ID of the current running app.""" """ID of the current running app."""
if self._is_feature_available(FeatureName.App): if self._is_feature_available(FeatureName.App):
return self.atv.metadata.app.identifier return self.atv.metadata.app.identifier
return None return None
@property @property
def app_name(self): def app_name(self) -> str | None:
"""Name of the current running app.""" """Name of the current running app."""
if self._is_feature_available(FeatureName.App): if self._is_feature_available(FeatureName.App):
return self.atv.metadata.app.name return self.atv.metadata.app.name
return None return None
@property @property
def source_list(self): def source_list(self) -> list[str]:
"""List of available input sources.""" """List of available input sources."""
return list(self._app_list.keys()) return list(self._app_list.keys())
@property @property
def media_content_type(self): def media_content_type(self) -> MediaType | None:
"""Content type of current playing media.""" """Content type of current playing media."""
if self._playing: if self._playing:
return { return {
MediaType.Video: MEDIA_TYPE_VIDEO, AppleMediaType.Video: MediaType.VIDEO,
MediaType.Music: MEDIA_TYPE_MUSIC, AppleMediaType.Music: MediaType.MUSIC,
MediaType.TV: MEDIA_TYPE_TVSHOW, AppleMediaType.TV: MediaType.TVSHOW,
}.get(self._playing.media_type) }.get(self._playing.media_type)
return None return None
@property @property
def media_content_id(self): def media_content_id(self) -> str | None:
"""Content ID of current playing media.""" """Content ID of current playing media."""
if self._playing: if self._playing:
return self._playing.content_identifier return self._playing.content_identifier
return None return None
@property @property
def volume_level(self): def volume_level(self) -> float | None:
"""Volume level of the media player (0..1).""" """Volume level of the media player (0..1)."""
if self._is_feature_available(FeatureName.Volume): if self._is_feature_available(FeatureName.Volume):
return self.atv.audio.volume / 100.0 # from percent return self.atv.audio.volume / 100.0 # from percent
return None return None
@property @property
def media_duration(self): def media_duration(self) -> int | None:
"""Duration of current playing media in seconds.""" """Duration of current playing media in seconds."""
if self._playing: if self._playing:
return self._playing.total_time return self._playing.total_time
return None return None
@property @property
def media_position(self): def media_position(self) -> int | None:
"""Position of current playing media in seconds.""" """Position of current playing media in seconds."""
if self._playing: if self._playing:
return self._playing.position return self._playing.position
return None return None
@property @property
def media_position_updated_at(self): def media_position_updated_at(self) -> datetime | None:
"""Last valid time of media position.""" """Last valid time of media position."""
if self.state in (STATE_PLAYING, STATE_PAUSED): if self.state in {MediaPlayerState.PLAYING, MediaPlayerState.PAUSED}:
return dt_util.utcnow() return dt_util.utcnow()
return None return None
async def async_play_media( async def async_play_media(
self, media_type: str, media_id: str, **kwargs: Any self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None: ) -> None:
"""Send the play_media command to the media player.""" """Send the play_media command to the media player."""
# If input (file) has a file format supported by pyatv, then stream it with # If input (file) has a file format supported by pyatv, then stream it with
# RAOP. Otherwise try to play it with regular AirPlay. # RAOP. Otherwise try to play it with regular AirPlay.
if media_type == MEDIA_TYPE_APP: if media_type == MediaType.APP:
await self.atv.apps.launch_app(media_id) await self.atv.apps.launch_app(media_id)
return return
@ -294,10 +283,10 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
self.hass, media_id, self.entity_id self.hass, media_id, self.entity_id
) )
media_id = async_process_play_media_url(self.hass, play_item.url) media_id = async_process_play_media_url(self.hass, play_item.url)
media_type = MEDIA_TYPE_MUSIC media_type = MediaType.MUSIC
if self._is_feature_available(FeatureName.StreamFile) and ( if self._is_feature_available(FeatureName.StreamFile) and (
media_type == MEDIA_TYPE_MUSIC or await is_streamable(media_id) media_type == MediaType.MUSIC or await is_streamable(media_id)
): ):
_LOGGER.debug("Streaming %s via RAOP", media_id) _LOGGER.debug("Streaming %s via RAOP", media_id)
await self.atv.stream.stream_file(media_id) await self.atv.stream.stream_file(media_id)
@ -308,13 +297,13 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
_LOGGER.error("Media streaming is not possible with current configuration") _LOGGER.error("Media streaming is not possible with current configuration")
@property @property
def media_image_hash(self): def media_image_hash(self) -> str | None:
"""Hash value for media image.""" """Hash value for media image."""
state = self.state state = self.state
if ( if (
self._playing self._playing
and self._is_feature_available(FeatureName.Artwork) and self._is_feature_available(FeatureName.Artwork)
and state not in [None, STATE_OFF, STATE_IDLE] and state not in {None, MediaPlayerState.OFF, MediaPlayerState.IDLE}
): ):
return self.atv.metadata.artwork_id return self.atv.metadata.artwork_id
return None return None
@ -322,7 +311,7 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
async def async_get_media_image(self) -> tuple[bytes | None, str | None]: async def async_get_media_image(self) -> tuple[bytes | None, str | None]:
"""Fetch media image of current playing image.""" """Fetch media image of current playing image."""
state = self.state state = self.state
if self._playing and state not in [STATE_OFF, STATE_IDLE]: if self._playing and state not in {MediaPlayerState.OFF, MediaPlayerState.IDLE}:
artwork = await self.atv.metadata.artwork() artwork = await self.atv.metadata.artwork()
if artwork: if artwork:
return artwork.bytes, artwork.mimetype return artwork.bytes, artwork.mimetype
@ -330,65 +319,65 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
return None, None return None, None
@property @property
def media_title(self): def media_title(self) -> str | None:
"""Title of current playing media.""" """Title of current playing media."""
if self._playing: if self._playing:
return self._playing.title return self._playing.title
return None return None
@property @property
def media_artist(self): def media_artist(self) -> str | None:
"""Artist of current playing media, music track only.""" """Artist of current playing media, music track only."""
if self._is_feature_available(FeatureName.Artist): if self._playing and self._is_feature_available(FeatureName.Artist):
return self._playing.artist return self._playing.artist
return None return None
@property @property
def media_album_name(self): def media_album_name(self) -> str | None:
"""Album name of current playing media, music track only.""" """Album name of current playing media, music track only."""
if self._is_feature_available(FeatureName.Album): if self._playing and self._is_feature_available(FeatureName.Album):
return self._playing.album return self._playing.album
return None return None
@property @property
def media_series_title(self): def media_series_title(self) -> str | None:
"""Title of series of current playing media, TV show only.""" """Title of series of current playing media, TV show only."""
if self._is_feature_available(FeatureName.SeriesName): if self._playing and self._is_feature_available(FeatureName.SeriesName):
return self._playing.series_name return self._playing.series_name
return None return None
@property @property
def media_season(self): def media_season(self) -> str | None:
"""Season of current playing media, TV show only.""" """Season of current playing media, TV show only."""
if self._is_feature_available(FeatureName.SeasonNumber): if self._playing and self._is_feature_available(FeatureName.SeasonNumber):
return str(self._playing.season_number) return str(self._playing.season_number)
return None return None
@property @property
def media_episode(self): def media_episode(self) -> str | None:
"""Episode of current playing media, TV show only.""" """Episode of current playing media, TV show only."""
if self._is_feature_available(FeatureName.EpisodeNumber): if self._playing and self._is_feature_available(FeatureName.EpisodeNumber):
return str(self._playing.episode_number) return str(self._playing.episode_number)
return None return None
@property @property
def repeat(self): def repeat(self) -> RepeatMode | None:
"""Return current repeat mode.""" """Return current repeat mode."""
if self._is_feature_available(FeatureName.Repeat): if self._playing and self._is_feature_available(FeatureName.Repeat):
return { return {
RepeatState.Track: REPEAT_MODE_ONE, RepeatState.Track: RepeatMode.ONE,
RepeatState.All: REPEAT_MODE_ALL, RepeatState.All: RepeatMode.ALL,
}.get(self._playing.repeat, REPEAT_MODE_OFF) }.get(self._playing.repeat, RepeatMode.OFF)
return None return None
@property @property
def shuffle(self): def shuffle(self) -> bool | None:
"""Boolean if shuffle is enabled.""" """Boolean if shuffle is enabled."""
if self._is_feature_available(FeatureName.Shuffle): if self._playing and self._is_feature_available(FeatureName.Shuffle):
return self._playing.shuffle != ShuffleState.Off return self._playing.shuffle != ShuffleState.Off
return None return None
def _is_feature_available(self, feature): def _is_feature_available(self, feature: FeatureName) -> bool:
"""Return if a feature is available.""" """Return if a feature is available."""
if self.atv and self._playing: if self.atv and self._playing:
return self.atv.features.in_state(FeatureState.Available, feature) return self.atv.features.in_state(FeatureState.Available, feature)
@ -396,7 +385,7 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
async def async_browse_media( async def async_browse_media(
self, self,
media_content_type: str | None = None, media_content_type: MediaType | str | None = None,
media_content_id: str | None = None, media_content_id: str | None = None,
) -> BrowseMedia: ) -> BrowseMedia:
"""Implement the websocket media browsing helper.""" """Implement the websocket media browsing helper."""
@ -496,12 +485,12 @@ class AppleTvMediaPlayer(AppleTVEntity, MediaPlayerEntity):
# pyatv expects volume in percent # pyatv expects volume in percent
await self.atv.audio.set_volume(volume * 100.0) await self.atv.audio.set_volume(volume * 100.0)
async def async_set_repeat(self, repeat: str) -> None: async def async_set_repeat(self, repeat: RepeatMode) -> None:
"""Set repeat mode.""" """Set repeat mode."""
if self.atv: if self.atv:
mode = { mode = {
REPEAT_MODE_ONE: RepeatState.Track, RepeatMode.ONE: RepeatState.Track,
REPEAT_MODE_ALL: RepeatState.All, RepeatMode.ALL: RepeatState.All,
}.get(repeat, RepeatState.Off) }.get(repeat, RepeatState.Off)
await self.atv.remote_control.set_repeat(mode) await self.atv.remote_control.set_repeat(mode)

View File

@ -127,6 +127,7 @@ from .const import ( # noqa: F401
SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET, SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP, SUPPORT_VOLUME_STEP,
MediaClass,
MediaPlayerEntityFeature, MediaPlayerEntityFeature,
MediaPlayerState, MediaPlayerState,
MediaType, MediaType,