Support multiple Plex servers in media browser (#68321)

This commit is contained in:
jjlawren 2022-03-21 17:48:44 -05:00 committed by GitHub
parent 16655c4ccc
commit 653305b998
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 436 additions and 221 deletions

View File

@ -896,6 +896,7 @@ omit =
homeassistant/components/plaato/const.py homeassistant/components/plaato/const.py
homeassistant/components/plaato/entity.py homeassistant/components/plaato/entity.py
homeassistant/components/plaato/sensor.py homeassistant/components/plaato/sensor.py
homeassistant/components/plex/cast.py
homeassistant/components/plex/media_player.py homeassistant/components/plex/media_player.py
homeassistant/components/plex/view.py homeassistant/components/plex/view.py
homeassistant/components/plugwise/select.py homeassistant/components/plugwise/select.py

View File

@ -69,7 +69,7 @@ async def async_browse_media(hass, media_content_type, media_content_id, platfor
return await hass.async_add_executor_job( return await hass.async_add_executor_job(
partial( partial(
browse_media, browse_media,
plex_server, hass,
is_internal, is_internal,
media_content_type, media_content_type,
media_content_id, media_content_id,

View File

@ -10,8 +10,7 @@ from homeassistant.components.media_player.const import MEDIA_CLASS_APP
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from . import async_browse_media as async_browse_plex_media, is_plex_media_id from . import async_browse_media as async_browse_plex_media, is_plex_media_id
from .const import PLEX_URI_SCHEME from .services import process_plex_payload
from .services import lookup_plex_media
async def async_get_media_browser_root_object( async def async_get_media_browser_root_object(
@ -51,13 +50,10 @@ def _play_media(
hass: HomeAssistant, chromecast: Chromecast, media_type: str, media_id: str hass: HomeAssistant, chromecast: Chromecast, media_type: str, media_id: str
) -> None: ) -> None:
"""Play media.""" """Play media."""
media_id = media_id[len(PLEX_URI_SCHEME) :] result = process_plex_payload(hass, media_type, media_id)
media = lookup_plex_media(hass, media_type, media_id)
if media is None:
return
controller = PlexController() controller = PlexController()
chromecast.register_handler(controller) chromecast.register_handler(controller)
controller.play_media(media) controller.play_media(result.media, offset=result.offset)
async def async_play_media( async def async_play_media(
@ -68,7 +64,7 @@ async def async_play_media(
media_id: str, media_id: str,
) -> bool: ) -> bool:
"""Play media.""" """Play media."""
if media_id and media_id.startswith(PLEX_URI_SCHEME): if is_plex_media_id(media_id):
await hass.async_add_executor_job( await hass.async_add_executor_job(
_play_media, hass, chromecast, media_type, media_id _play_media, hass, chromecast, media_type, media_id
) )

View File

@ -1,7 +1,7 @@
"""Support to interface with the Plex API.""" """Support to interface with the Plex API."""
from __future__ import annotations from __future__ import annotations
import logging from yarl import URL
from homeassistant.components.media_player import BrowseMedia from homeassistant.components.media_player import BrowseMedia
from homeassistant.components.media_player.const import ( from homeassistant.components.media_player.const import (
@ -18,7 +18,7 @@ from homeassistant.components.media_player.const import (
) )
from homeassistant.components.media_player.errors import BrowseError from homeassistant.components.media_player.errors import BrowseError
from .const import DOMAIN, PLEX_URI_SCHEME from .const import DOMAIN, SERVERS
from .errors import MediaNotFound from .errors import MediaNotFound
from .helpers import pretty_title from .helpers import pretty_title
@ -27,16 +27,7 @@ class UnknownMediaType(BrowseError):
"""Unknown media type.""" """Unknown media type."""
HUB_PREFIX = "hub:"
EXPANDABLES = ["album", "artist", "playlist", "season", "show"] EXPANDABLES = ["album", "artist", "playlist", "season", "show"]
PLAYLISTS_BROWSE_PAYLOAD = {
"title": "Playlists",
"media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": PLEX_URI_SCHEME + "all",
"media_content_type": "playlists",
"can_play": False,
"can_expand": True,
}
ITEM_TYPE_MEDIA_CLASS = { ITEM_TYPE_MEDIA_CLASS = {
"album": MEDIA_CLASS_ALBUM, "album": MEDIA_CLASS_ALBUM,
"artist": MEDIA_CLASS_ARTIST, "artist": MEDIA_CLASS_ARTIST,
@ -52,25 +43,39 @@ ITEM_TYPE_MEDIA_CLASS = {
"video": MEDIA_CLASS_VIDEO, "video": MEDIA_CLASS_VIDEO,
} }
_LOGGER = logging.getLogger(__name__)
def browse_media( # noqa: C901 def browse_media( # noqa: C901
plex_server, is_internal, media_content_type, media_content_id, *, platform=None hass, is_internal, media_content_type, media_content_id, *, platform=None
): ):
"""Implement the websocket media browsing helper.""" """Implement the websocket media browsing helper."""
server_id = None
plex_server = None
special_folder = None
if media_content_id:
url = URL(media_content_id)
server_id = url.host
plex_server = hass.data[DOMAIN][SERVERS][server_id]
if media_content_type == "hub":
_, hub_location, hub_identifier = url.parts
elif media_content_type in ["library", "server"] and len(url.parts) > 2:
_, media_content_id, special_folder = url.parts
else:
media_content_id = url.name
if media_content_type in ("plex_root", None):
return root_payload(hass, is_internal, platform=platform)
def item_payload(item, short_name=False): def item_payload(item, short_name=False):
"""Create response payload for a single media item.""" """Create response payload for a single media item."""
try: try:
media_class = ITEM_TYPE_MEDIA_CLASS[item.type] media_class = ITEM_TYPE_MEDIA_CLASS[item.type]
except KeyError as err: except KeyError as err:
_LOGGER.debug("Unknown type received: %s", item.type) raise UnknownMediaType("Unknown type received: {item.type}") from err
raise UnknownMediaType from err
payload = { payload = {
"title": pretty_title(item, short_name), "title": pretty_title(item, short_name),
"media_class": media_class, "media_class": media_class,
"media_content_id": PLEX_URI_SCHEME + str(item.ratingKey), "media_content_id": generate_plex_uri(server_id, item.ratingKey),
"media_content_type": item.type, "media_content_type": item.type,
"can_play": True, "can_play": True,
"can_expand": item.type in EXPANDABLES, "can_expand": item.type in EXPANDABLES,
@ -81,16 +86,41 @@ def browse_media( # noqa: C901
thumbnail = item.thumbUrl thumbnail = item.thumbUrl
else: else:
thumbnail = get_proxy_image_url( thumbnail = get_proxy_image_url(
plex_server.machine_identifier, server_id,
item.ratingKey, item.ratingKey,
) )
payload["thumbnail"] = thumbnail payload["thumbnail"] = thumbnail
return BrowseMedia(**payload) return BrowseMedia(**payload)
def library_payload(library_id): def server_payload():
"""Create response payload to describe libraries of the Plex server."""
server_info = BrowseMedia(
title=plex_server.friendly_name,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=generate_plex_uri(server_id, "server"),
media_content_type="server",
can_play=False,
can_expand=True,
children=[],
children_media_class=MEDIA_CLASS_DIRECTORY,
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
)
if platform != "sonos":
server_info.children.append(
special_library_payload(server_info, "Recommended")
)
for library in plex_server.library.sections():
if library.type == "photo":
continue
if library.type != "artist" and platform == "sonos":
continue
server_info.children.append(library_section_payload(library))
server_info.children.append(playlists_payload())
return server_info
def library_contents(library):
"""Create response payload to describe contents of a specific library.""" """Create response payload to describe contents of a specific library."""
library = plex_server.library.sectionByID(library_id)
library_info = library_section_payload(library) library_info = library_section_payload(library)
library_info.children = [special_library_payload(library_info, "Recommended")] library_info.children = [special_library_payload(library_info, "Recommended")]
for item in library.all(): for item in library.all():
@ -100,9 +130,17 @@ def browse_media( # noqa: C901
continue continue
return library_info return library_info
def playlists_payload(platform): def playlists_payload():
"""Create response payload for all available playlists.""" """Create response payload for all available playlists."""
playlists_info = {**PLAYLISTS_BROWSE_PAYLOAD, "children": []} playlists_info = {
"title": "Playlists",
"media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": generate_plex_uri(server_id, "all"),
"media_content_type": "playlists",
"can_play": False,
"can_expand": True,
"children": [],
}
for playlist in plex_server.playlists(): for playlist in plex_server.playlists():
if playlist.playlistType != "audio" and platform == "sonos": if playlist.playlistType != "audio" and platform == "sonos":
continue continue
@ -137,35 +175,29 @@ def browse_media( # noqa: C901
continue continue
return media_info return media_info
if media_content_id: if media_content_type == "hub":
assert media_content_id.startswith(PLEX_URI_SCHEME) if hub_location == "server":
media_content_id = media_content_id[len(PLEX_URI_SCHEME) :]
if media_content_id and media_content_id.startswith(HUB_PREFIX):
media_content_id = media_content_id[len(HUB_PREFIX) :]
location, hub_identifier = media_content_id.split(":")
if location == "server":
hub = next( hub = next(
x x
for x in plex_server.library.hubs() for x in plex_server.library.hubs()
if x.hubIdentifier == hub_identifier if x.hubIdentifier == hub_identifier
) )
media_content_id = f"{HUB_PREFIX}server:{hub.hubIdentifier}" media_content_id = f"server/{hub.hubIdentifier}"
else: else:
library_section = plex_server.library.sectionByID(int(location)) library_section = plex_server.library.sectionByID(int(hub_location))
hub = next( hub = next(
x for x in library_section.hubs() if x.hubIdentifier == hub_identifier x for x in library_section.hubs() if x.hubIdentifier == hub_identifier
) )
media_content_id = f"{HUB_PREFIX}{hub.librarySectionID}:{hub.hubIdentifier}" media_content_id = f"{hub.librarySectionID}/{hub.hubIdentifier}"
try: try:
children_media_class = ITEM_TYPE_MEDIA_CLASS[hub.type] children_media_class = ITEM_TYPE_MEDIA_CLASS[hub.type]
except KeyError as err: except KeyError as err:
raise BrowseError(f"Unknown type received: {hub.type}") from err raise UnknownMediaType(f"Unknown type received: {hub.type}") from err
payload = { payload = {
"title": hub.title, "title": hub.title,
"media_class": MEDIA_CLASS_DIRECTORY, "media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": PLEX_URI_SCHEME + media_content_id, "media_content_id": generate_plex_uri(server_id, media_content_id),
"media_content_type": hub.type, "media_content_type": "hub",
"can_play": False, "can_play": False,
"can_expand": True, "can_expand": True,
"children": [], "children": [],
@ -180,11 +212,6 @@ def browse_media( # noqa: C901
payload["children"].append(item_payload(item)) payload["children"].append(item_payload(item))
return BrowseMedia(**payload) return BrowseMedia(**payload)
if media_content_id and ":" in media_content_id:
media_content_id, special_folder = media_content_id.split(":")
else:
special_folder = None
if special_folder: if special_folder:
if media_content_type == "server": if media_content_type == "server":
library_or_section = plex_server.library library_or_section = plex_server.library
@ -196,7 +223,7 @@ def browse_media( # noqa: C901
try: try:
children_media_class = ITEM_TYPE_MEDIA_CLASS[library_or_section.TYPE] children_media_class = ITEM_TYPE_MEDIA_CLASS[library_or_section.TYPE]
except KeyError as err: except KeyError as err:
raise BrowseError( raise UnknownMediaType(
f"Unknown type received: {library_or_section.TYPE}" f"Unknown type received: {library_or_section.TYPE}"
) from err ) from err
else: else:
@ -207,8 +234,9 @@ def browse_media( # noqa: C901
payload = { payload = {
"title": title, "title": title,
"media_class": MEDIA_CLASS_DIRECTORY, "media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": PLEX_URI_SCHEME "media_content_id": generate_plex_uri(
+ f"{media_content_id}:{special_folder}", server_id, f"{media_content_id}/{special_folder}"
),
"media_content_type": media_content_type, "media_content_type": media_content_type,
"can_play": False, "can_play": False,
"can_expand": True, "can_expand": True,
@ -225,11 +253,13 @@ def browse_media( # noqa: C901
return BrowseMedia(**payload) return BrowseMedia(**payload)
try: try:
if media_content_type in ("server", None): if media_content_type == "server":
return server_payload(plex_server, platform) return server_payload()
if media_content_type == "library": if media_content_type == "library":
return library_payload(int(media_content_id)) library_id = int(media_content_id)
library = plex_server.library.sectionByID(library_id)
return library_contents(library)
except UnknownMediaType as err: except UnknownMediaType as err:
raise BrowseError( raise BrowseError(
@ -237,7 +267,7 @@ def browse_media( # noqa: C901
) from err ) from err
if media_content_type == "playlists": if media_content_type == "playlists":
return playlists_payload(platform) return playlists_payload()
payload = { payload = {
"media_type": DOMAIN, "media_type": DOMAIN,
@ -249,17 +279,61 @@ def browse_media( # noqa: C901
return response return response
def generate_plex_uri(server_id, media_id):
"""Create a media_content_id URL for playable Plex media."""
if isinstance(media_id, int):
media_id = str(media_id)
if isinstance(media_id, str) and not media_id.startswith("/"):
media_id = f"/{media_id}"
return str(
URL.build(
scheme=DOMAIN,
host=server_id,
path=media_id,
)
)
def root_payload(hass, is_internal, platform=None):
"""Return root payload for Plex."""
children = []
for server_id in hass.data[DOMAIN][SERVERS]:
children.append(
browse_media(
hass,
is_internal,
"server",
generate_plex_uri(server_id, ""),
platform=platform,
)
)
if len(children) == 1:
return children[0]
return BrowseMedia(
title="Plex",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="",
media_content_type="plex_root",
can_play=False,
can_expand=True,
children=children,
)
def library_section_payload(section): def library_section_payload(section):
"""Create response payload for a single library section.""" """Create response payload for a single library section."""
try: try:
children_media_class = ITEM_TYPE_MEDIA_CLASS[section.TYPE] children_media_class = ITEM_TYPE_MEDIA_CLASS[section.TYPE]
except KeyError as err: except KeyError as err:
_LOGGER.debug("Unknown type received: %s", section.TYPE) raise UnknownMediaType(f"Unknown type received: {section.TYPE}") from err
raise UnknownMediaType from err server_id = section._server.machineIdentifier # pylint: disable=protected-access
return BrowseMedia( return BrowseMedia(
title=section.title, title=section.title,
media_class=MEDIA_CLASS_DIRECTORY, media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=PLEX_URI_SCHEME + str(section.key), media_content_id=generate_plex_uri(server_id, section.key),
media_content_type="library", media_content_type="library",
can_play=False, can_play=False,
can_expand=True, can_expand=True,
@ -270,10 +344,11 @@ def library_section_payload(section):
def special_library_payload(parent_payload, special_type): def special_library_payload(parent_payload, special_type):
"""Create response payload for special library folders.""" """Create response payload for special library folders."""
title = f"{special_type} ({parent_payload.title})" title = f"{special_type} ({parent_payload.title})"
special_library_id = f"{parent_payload.media_content_id}/{special_type}"
return BrowseMedia( return BrowseMedia(
title=title, title=title,
media_class=parent_payload.media_class, media_class=parent_payload.media_class,
media_content_id=f"{parent_payload.media_content_id}:{special_type}", media_content_id=special_library_id,
media_content_type=parent_payload.media_content_type, media_content_type=parent_payload.media_content_type,
can_play=False, can_play=False,
can_expand=True, can_expand=True,
@ -281,41 +356,18 @@ def special_library_payload(parent_payload, special_type):
) )
def server_payload(plex_server, platform):
"""Create response payload to describe libraries of the Plex server."""
server_info = BrowseMedia(
title=plex_server.friendly_name,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=PLEX_URI_SCHEME + plex_server.machine_identifier,
media_content_type="server",
can_play=False,
can_expand=True,
children=[],
children_media_class=MEDIA_CLASS_DIRECTORY,
)
if platform != "sonos":
server_info.children.append(special_library_payload(server_info, "Recommended"))
for library in plex_server.library.sections():
if library.type == "photo":
continue
if library.type != "artist" and platform == "sonos":
continue
server_info.children.append(library_section_payload(library))
server_info.children.append(BrowseMedia(**PLAYLISTS_BROWSE_PAYLOAD))
return server_info
def hub_payload(hub): def hub_payload(hub):
"""Create response payload for a hub.""" """Create response payload for a hub."""
if hasattr(hub, "librarySectionID"): if hasattr(hub, "librarySectionID"):
media_content_id = f"{HUB_PREFIX}{hub.librarySectionID}:{hub.hubIdentifier}" media_content_id = f"{hub.librarySectionID}/{hub.hubIdentifier}"
else: else:
media_content_id = f"{HUB_PREFIX}server:{hub.hubIdentifier}" media_content_id = f"server/{hub.hubIdentifier}"
server_id = hub._server.machineIdentifier # pylint: disable=protected-access
payload = { payload = {
"title": hub.title, "title": hub.title,
"media_class": MEDIA_CLASS_DIRECTORY, "media_class": MEDIA_CLASS_DIRECTORY,
"media_content_id": PLEX_URI_SCHEME + media_content_id, "media_content_id": generate_plex_uri(server_id, media_content_id),
"media_content_type": hub.type, "media_content_type": "hub",
"can_play": False, "can_play": False,
"can_expand": True, "can_expand": True,
} }
@ -324,10 +376,11 @@ def hub_payload(hub):
def station_payload(station): def station_payload(station):
"""Create response payload for a music station.""" """Create response payload for a music station."""
server_id = station._server.machineIdentifier # pylint: disable=protected-access
return BrowseMedia( return BrowseMedia(
title=station.title, title=station.title,
media_class=ITEM_TYPE_MEDIA_CLASS[station.type], media_class=ITEM_TYPE_MEDIA_CLASS[station.type],
media_content_id=PLEX_URI_SCHEME + station.key, media_content_id=generate_plex_uri(server_id, station.key),
media_content_type="station", media_content_type="station",
can_play=True, can_play=True,
can_expand=False, can_expand=False,

View File

@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
from functools import wraps from functools import wraps
import json
import logging import logging
import plexapi.exceptions import plexapi.exceptions
@ -46,12 +45,11 @@ from .const import (
PLEX_UPDATE_MEDIA_PLAYER_SESSION_SIGNAL, PLEX_UPDATE_MEDIA_PLAYER_SESSION_SIGNAL,
PLEX_UPDATE_MEDIA_PLAYER_SIGNAL, PLEX_UPDATE_MEDIA_PLAYER_SIGNAL,
PLEX_UPDATE_SENSOR_SIGNAL, PLEX_UPDATE_SENSOR_SIGNAL,
PLEX_URI_SCHEME,
SERVERS, SERVERS,
TRANSIENT_DEVICE_MODELS, TRANSIENT_DEVICE_MODELS,
) )
from .errors import MediaNotFound
from .media_browser import browse_media from .media_browser import browse_media
from .services import process_plex_payload
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -483,51 +481,13 @@ class PlexMediaPlayer(MediaPlayerEntity):
f"Client is not currently accepting playback controls: {self.name}" f"Client is not currently accepting playback controls: {self.name}"
) )
if not self.plex_server.has_token: result = process_plex_payload(
_LOGGER.warning( self.hass, media_type, media_id, default_plex_server=self.plex_server
"Plex integration configured without a token, playback may fail" )
) _LOGGER.debug("Attempting to play %s on %s", result.media, self.name)
if media_id.startswith(PLEX_URI_SCHEME):
media_id = media_id[len(PLEX_URI_SCHEME) :]
if media_type == "station":
playqueue = self.plex_server.create_station_playqueue(media_id)
try:
self.device.playMedia(playqueue)
except requests.exceptions.ConnectTimeout as exc:
raise HomeAssistantError(
f"Request failed when playing on {self.name}"
) from exc
return
src = json.loads(media_id)
if isinstance(src, int):
src = {"plex_key": src}
offset = 0
if playqueue_id := src.pop("playqueue_id", None):
try:
playqueue = self.plex_server.get_playqueue(playqueue_id)
except plexapi.exceptions.NotFound as err:
raise MediaNotFound(
f"PlayQueue '{playqueue_id}' could not be found"
) from err
else:
shuffle = src.pop("shuffle", 0)
offset = src.pop("offset", 0) * 1000
resume = src.pop("resume", False)
media = self.plex_server.lookup_media(media_type, **src)
if resume and not offset:
offset = media.viewOffset
_LOGGER.debug("Attempting to play %s on %s", media, self.name)
playqueue = self.plex_server.create_playqueue(media, shuffle=shuffle)
try: try:
self.device.playMedia(playqueue, offset=offset) self.device.playMedia(result.media, offset=result.offset)
except requests.exceptions.ConnectTimeout as exc: except requests.exceptions.ConnectTimeout as exc:
raise HomeAssistantError( raise HomeAssistantError(
f"Request failed when playing on {self.name}" f"Request failed when playing on {self.name}"
@ -578,7 +538,7 @@ class PlexMediaPlayer(MediaPlayerEntity):
is_internal = is_internal_request(self.hass) is_internal = is_internal_request(self.hass)
return await self.hass.async_add_executor_job( return await self.hass.async_add_executor_job(
browse_media, browse_media,
self.plex_server, self.hass,
is_internal, is_internal,
media_content_type, media_content_type,
media_content_id, media_content_id,

View File

@ -1,4 +1,5 @@
"""Models to represent various Plex objects used in the integration.""" """Models to represent various Plex objects used in the integration."""
from distutils.util import strtobool
import logging import logging
from homeassistant.components.media_player.const import ( from homeassistant.components.media_player.const import (
@ -141,3 +142,35 @@ class PlexSession:
thumb_url = media.url(media.art) thumb_url = media.url(media.art)
return thumb_url return thumb_url
class PlexMediaSearchResult:
"""Represents results from a Plex media media_content_id search.
Results are used by media_player.play_media implementations.
"""
def __init__(self, media, params=None) -> None:
"""Initialize the result."""
self.media = media
self._params = params or {}
@property
def offset(self) -> int:
"""Provide the appropriate offset based on payload contents."""
if offset := self._params.get("offset", 0):
return offset * 1000
resume = self._params.get("resume", False)
if isinstance(resume, str):
resume = bool(strtobool(resume))
if resume:
return self.media.viewOffset
return 0
@property
def shuffle(self) -> bool:
"""Return value of shuffle parameter."""
shuffle = self._params.get("shuffle", False)
if isinstance(shuffle, str):
shuffle = bool(strtobool(shuffle))
return shuffle

View File

@ -4,6 +4,7 @@ import logging
from plexapi.exceptions import NotFound from plexapi.exceptions import NotFound
import voluptuous as vol import voluptuous as vol
from yarl import URL
from homeassistant.core import HomeAssistant, ServiceCall from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
@ -12,11 +13,13 @@ from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import ( from .const import (
DOMAIN, DOMAIN,
PLEX_UPDATE_PLATFORMS_SIGNAL, PLEX_UPDATE_PLATFORMS_SIGNAL,
PLEX_URI_SCHEME,
SERVERS, SERVERS,
SERVICE_REFRESH_LIBRARY, SERVICE_REFRESH_LIBRARY,
SERVICE_SCAN_CLIENTS, SERVICE_SCAN_CLIENTS,
) )
from .errors import MediaNotFound from .errors import MediaNotFound
from .models import PlexMediaSearchResult
REFRESH_LIBRARY_SCHEMA = vol.Schema( REFRESH_LIBRARY_SCHEMA = vol.Schema(
{vol.Optional("server_name"): str, vol.Required("library_name"): str} {vol.Optional("server_name"): str, vol.Required("library_name"): str}
@ -73,7 +76,7 @@ def refresh_library(hass: HomeAssistant, service_call: ServiceCall) -> None:
library.update() library.update()
def get_plex_server(hass, plex_server_name=None): def get_plex_server(hass, plex_server_name=None, plex_server_id=None):
"""Retrieve a configured Plex server by name.""" """Retrieve a configured Plex server by name."""
if DOMAIN not in hass.data: if DOMAIN not in hass.data:
raise HomeAssistantError("Plex integration not configured") raise HomeAssistantError("Plex integration not configured")
@ -81,6 +84,9 @@ def get_plex_server(hass, plex_server_name=None):
if not plex_servers: if not plex_servers:
raise HomeAssistantError("No Plex servers available") raise HomeAssistantError("No Plex servers available")
if plex_server_id:
return hass.data[DOMAIN][SERVERS][plex_server_id]
if plex_server_name: if plex_server_name:
plex_server = next( plex_server = next(
(x for x in plex_servers if x.friendly_name == plex_server_name), None (x for x in plex_servers if x.friendly_name == plex_server_name), None
@ -101,30 +107,69 @@ def get_plex_server(hass, plex_server_name=None):
) )
def lookup_plex_media(hass, content_type, content_id): def process_plex_payload(
"""Look up Plex media for other integrations using media_player.play_media service payloads.""" hass, content_type, content_id, default_plex_server=None, supports_playqueues=True
content = json.loads(content_id) ) -> PlexMediaSearchResult:
"""Look up Plex media using media_player.play_media service payloads."""
plex_server = default_plex_server
if content_id.startswith(PLEX_URI_SCHEME + "{"):
# Handle the special payload of 'plex://{<json>}'
content_id = content_id[len(PLEX_URI_SCHEME) :]
content = json.loads(content_id)
elif content_id.startswith(PLEX_URI_SCHEME):
# Handle standard media_browser payloads
plex_url = URL(content_id)
if plex_url.name:
if len(plex_url.parts) == 2:
# The path contains a single item, will always be a ratingKey
content = int(plex_url.name)
else:
# For "special" items like radio stations
content = plex_url.path
server_id = plex_url.host
plex_server = get_plex_server(hass, plex_server_id=server_id)
else:
# Handle legacy payloads without server_id in URL host position
content = int(plex_url.host) # type: ignore[arg-type]
else:
content = json.loads(content_id)
if isinstance(content, dict):
if plex_server_name := content.pop("plex_server", None):
plex_server = get_plex_server(hass, plex_server_name)
if not plex_server:
plex_server = get_plex_server(hass)
if content_type == "station":
if not supports_playqueues:
raise HomeAssistantError("Plex stations are not supported on this device")
playqueue = plex_server.create_station_playqueue(content)
return PlexMediaSearchResult(playqueue)
if isinstance(content, int): if isinstance(content, int):
content = {"plex_key": content} content = {"plex_key": content}
content_type = DOMAIN content_type = DOMAIN
plex_server_name = content.pop("plex_server", None)
plex_server = get_plex_server(hass, plex_server_name)
if playqueue_id := content.pop("playqueue_id", None): if playqueue_id := content.pop("playqueue_id", None):
if not supports_playqueues:
raise HomeAssistantError("Plex playqueues are not supported on this device")
try: try:
playqueue = plex_server.get_playqueue(playqueue_id) playqueue = plex_server.get_playqueue(playqueue_id)
except NotFound as err: except NotFound as err:
raise MediaNotFound( raise MediaNotFound(
f"PlayQueue '{playqueue_id}' could not be found" f"PlayQueue '{playqueue_id}' could not be found"
) from err ) from err
return playqueue return PlexMediaSearchResult(playqueue, content)
shuffle = content.pop("shuffle", 0) search_query = content.copy()
media = plex_server.lookup_media(content_type, **content) shuffle = search_query.pop("shuffle", 0)
if shuffle: media = plex_server.lookup_media(content_type, **search_query)
return plex_server.create_playqueue(media, shuffle=shuffle)
return media if supports_playqueues and (isinstance(media, list) or shuffle):
playqueue = plex_server.create_playqueue(media, shuffle=shuffle)
return PlexMediaSearchResult(playqueue, content)
return PlexMediaSearchResult(media, content)

View File

@ -3,7 +3,6 @@ from __future__ import annotations
from asyncio import run_coroutine_threadsafe from asyncio import run_coroutine_threadsafe
import datetime import datetime
import json
import logging import logging
from typing import Any from typing import Any
@ -50,7 +49,7 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_SET, SUPPORT_VOLUME_SET,
) )
from homeassistant.components.plex.const import PLEX_URI_SCHEME from homeassistant.components.plex.const import PLEX_URI_SCHEME
from homeassistant.components.plex.services import lookup_plex_media from homeassistant.components.plex.services import process_plex_payload
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_TIME, STATE_IDLE, STATE_PAUSED, STATE_PLAYING from homeassistant.const import ATTR_TIME, STATE_IDLE, STATE_PAUSED, STATE_PLAYING
from homeassistant.core import HomeAssistant, ServiceCall, callback from homeassistant.core import HomeAssistant, ServiceCall, callback
@ -567,20 +566,16 @@ class SonosMediaPlayerEntity(SonosEntity, MediaPlayerEntity):
soco = self.coordinator.soco soco = self.coordinator.soco
if media_id and media_id.startswith(PLEX_URI_SCHEME): if media_id and media_id.startswith(PLEX_URI_SCHEME):
plex_plugin = self.speaker.plex_plugin plex_plugin = self.speaker.plex_plugin
media_id = media_id[len(PLEX_URI_SCHEME) :] result = process_plex_payload(
payload = json.loads(media_id) self.hass, media_type, media_id, supports_playqueues=False
if isinstance(payload, dict): )
shuffle = payload.pop("shuffle", False) if result.shuffle:
else:
shuffle = False
media = lookup_plex_media(self.hass, media_type, json.dumps(payload))
if shuffle:
self.set_shuffle(True) self.set_shuffle(True)
if kwargs.get(ATTR_MEDIA_ENQUEUE): if kwargs.get(ATTR_MEDIA_ENQUEUE):
plex_plugin.add_to_queue(media) plex_plugin.add_to_queue(result.media)
else: else:
soco.clear_queue() soco.clear_queue()
plex_plugin.add_to_queue(media) plex_plugin.add_to_queue(result.media)
soco.play_from_queue(0) soco.play_from_queue(0)
return return

View File

@ -1,6 +1,8 @@
"""Tests for Plex media browser.""" """Tests for Plex media browser."""
from http import HTTPStatus from http import HTTPStatus
from unittest.mock import patch from unittest.mock import Mock, patch
from yarl import URL
from homeassistant.components.media_player.const import ( from homeassistant.components.media_player.const import (
ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_ID,
@ -104,6 +106,7 @@ class MockPlexStation:
title = "Radio Station" title = "Radio Station"
radio = True radio = True
type = "playlist" type = "playlist"
_server = Mock(machineIdentifier="unique_id_123")
async def test_browse_media( async def test_browse_media(
@ -138,7 +141,7 @@ async def test_browse_media(
assert result[ATTR_MEDIA_CONTENT_TYPE] == "server" assert result[ATTR_MEDIA_CONTENT_TYPE] == "server"
assert ( assert (
result[ATTR_MEDIA_CONTENT_ID] result[ATTR_MEDIA_CONTENT_ID]
== PLEX_URI_SCHEME + DEFAULT_DATA[CONF_SERVER_IDENTIFIER] == PLEX_URI_SCHEME + DEFAULT_DATA[CONF_SERVER_IDENTIFIER] + "/server"
) )
# Library Sections + Recommended + Playlists # Library Sections + Recommended + Playlists
assert len(result["children"]) == len(mock_plex_server.library.sections()) + 2 assert len(result["children"]) == len(mock_plex_server.library.sections()) + 2
@ -162,7 +165,7 @@ async def test_browse_media(
"entity_id": media_players[0], "entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: "server", ATTR_MEDIA_CONTENT_TYPE: "server",
ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME
+ f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}:{special_keys[0]}", + f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/server/{special_keys[0]}",
} }
) )
@ -174,13 +177,14 @@ async def test_browse_media(
assert result[ATTR_MEDIA_CONTENT_TYPE] == "server" assert result[ATTR_MEDIA_CONTENT_TYPE] == "server"
assert ( assert (
result[ATTR_MEDIA_CONTENT_ID] result[ATTR_MEDIA_CONTENT_ID]
== PLEX_URI_SCHEME + f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}:{special_keys[0]}" == PLEX_URI_SCHEME
+ f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/server/{special_keys[0]}"
) )
assert len(result["children"]) == 4 # Hardcoded in fixture assert len(result["children"]) == 4 # Hardcoded in fixture
assert result["children"][0]["media_content_type"] == "mixed" assert result["children"][0]["media_content_type"] == "hub"
assert result["children"][1]["media_content_type"] == "album" assert result["children"][1]["media_content_type"] == "hub"
assert result["children"][2]["media_content_type"] == "clip" assert result["children"][2]["media_content_type"] == "hub"
assert result["children"][3]["media_content_type"] == "playlist" assert result["children"][3]["media_content_type"] == "hub"
# Browse into a special folder (server): Continue Watching # Browse into a special folder (server): Continue Watching
msg_id += 1 msg_id += 1
@ -199,7 +203,7 @@ async def test_browse_media(
assert msg["type"] == TYPE_RESULT assert msg["type"] == TYPE_RESULT
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "mixed" assert result[ATTR_MEDIA_CONTENT_TYPE] == "hub"
requests_mock.get( requests_mock.get(
f"{mock_plex_server.url_in_use}/hubs/sections/3?includeStations=1", f"{mock_plex_server.url_in_use}/hubs/sections/3?includeStations=1",
@ -216,7 +220,7 @@ async def test_browse_media(
"entity_id": media_players[0], "entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: "library", ATTR_MEDIA_CONTENT_TYPE: "library",
ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME
+ f"{library_section_id}:{special_keys[0]}", + f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/{library_section_id}/{special_keys[0]}",
} }
) )
@ -228,7 +232,8 @@ async def test_browse_media(
assert result[ATTR_MEDIA_CONTENT_TYPE] == "library" assert result[ATTR_MEDIA_CONTENT_TYPE] == "library"
assert ( assert (
result[ATTR_MEDIA_CONTENT_ID] result[ATTR_MEDIA_CONTENT_ID]
== PLEX_URI_SCHEME + f"{library_section_id}:{special_keys[0]}" == PLEX_URI_SCHEME
+ f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/{library_section_id}/{special_keys[0]}"
) )
assert len(result["children"]) == 1 assert len(result["children"]) == 1
@ -249,7 +254,7 @@ async def test_browse_media(
assert msg["type"] == TYPE_RESULT assert msg["type"] == TYPE_RESULT
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "station" assert result[ATTR_MEDIA_CONTENT_TYPE] == "hub"
assert len(result["children"]) == 3 assert len(result["children"]) == 3
assert result["children"][0]["title"] == "Library Radio" assert result["children"][0]["title"] == "Library Radio"
@ -271,7 +276,7 @@ async def test_browse_media(
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "library" assert result[ATTR_MEDIA_CONTENT_TYPE] == "library"
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
# All items in section + Hubs # All items in section + Hubs
assert ( assert (
len(result["children"]) len(result["children"])
@ -305,7 +310,7 @@ async def test_browse_media(
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "show" assert result[ATTR_MEDIA_CONTENT_TYPE] == "show"
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
assert result["title"] == mock_plex_server.fetch_item(result_id).title assert result["title"] == mock_plex_server.fetch_item(result_id).title
assert result["children"][0]["title"] == f"{mock_season.title} ({mock_season.year})" assert result["children"][0]["title"] == f"{mock_season.title} ({mock_season.year})"
@ -335,7 +340,7 @@ async def test_browse_media(
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "season" assert result[ATTR_MEDIA_CONTENT_TYPE] == "season"
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
assert ( assert (
result["title"] result["title"]
== f"{mock_season.parentTitle} - {mock_season.title} ({mock_season.year})" == f"{mock_season.parentTitle} - {mock_season.title} ({mock_season.year})"
@ -360,7 +365,7 @@ async def test_browse_media(
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
assert result[ATTR_MEDIA_CONTENT_TYPE] == "library" assert result[ATTR_MEDIA_CONTENT_TYPE] == "library"
assert result["title"] == "Music" assert result["title"] == "Music"
@ -390,7 +395,7 @@ async def test_browse_media(
assert mock_fetch.called assert mock_fetch.called
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
assert result[ATTR_MEDIA_CONTENT_TYPE] == "artist" assert result[ATTR_MEDIA_CONTENT_TYPE] == "artist"
assert result["title"] == mock_artist.title assert result["title"] == mock_artist.title
assert result["children"][0]["title"] == "Radio Station" assert result["children"][0]["title"] == "Radio Station"
@ -411,7 +416,7 @@ async def test_browse_media(
assert msg["success"] assert msg["success"]
result = msg["result"] result = msg["result"]
result_id = int(result[ATTR_MEDIA_CONTENT_ID][len(PLEX_URI_SCHEME) :]) result_id = int(URL(result[ATTR_MEDIA_CONTENT_ID]).name)
assert result[ATTR_MEDIA_CONTENT_TYPE] == "album" assert result[ATTR_MEDIA_CONTENT_TYPE] == "album"
assert ( assert (
result["title"] result["title"]

View File

@ -1,6 +1,6 @@
"""Tests for Plex player playback methods/services.""" """Tests for Plex player playback methods/services."""
from http import HTTPStatus from http import HTTPStatus
from unittest.mock import patch from unittest.mock import Mock, patch
import pytest import pytest
@ -11,14 +11,19 @@ from homeassistant.components.media_player.const import (
MEDIA_TYPE_MOVIE, MEDIA_TYPE_MOVIE,
SERVICE_PLAY_MEDIA, SERVICE_PLAY_MEDIA,
) )
from homeassistant.components.plex.const import CONF_SERVER_IDENTIFIER, PLEX_URI_SCHEME
from homeassistant.const import ATTR_ENTITY_ID from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from .const import DEFAULT_DATA, PLEX_DIRECT_URL
class MockPlexMedia: class MockPlexMedia:
"""Minimal mock of plexapi media object.""" """Minimal mock of plexapi media object."""
key = "key" key = "key"
viewOffset = 333
_server = Mock(_baseurl=PLEX_DIRECT_URL)
def __init__(self, title, mediatype): def __init__(self, title, mediatype):
"""Initialize the instance.""" """Initialize the instance."""
@ -51,7 +56,9 @@ async def test_media_player_playback(
media_player = "media_player.plex_plex_web_chrome" media_player = "media_player.plex_plex_web_chrome"
requests_mock.post("/playqueues", text=playqueue_created) requests_mock.post("/playqueues", text=playqueue_created)
requests_mock.get("/player/playback/playMedia", status_code=HTTPStatus.OK) playmedia_mock = requests_mock.get(
"/player/playback/playMedia", status_code=HTTPStatus.OK
)
# Test media lookup failure # Test media lookup failure
payload = '{"library_name": "Movies", "title": "Movie 1" }' payload = '{"library_name": "Movies", "title": "Movie 1" }'
@ -67,6 +74,7 @@ async def test_media_player_playback(
}, },
True, True,
) )
assert not playmedia_mock.called
assert f"No {MEDIA_TYPE_MOVIE} results in 'Movies' for" in str(excinfo.value) assert f"No {MEDIA_TYPE_MOVIE} results in 'Movies' for" in str(excinfo.value)
movie1 = MockPlexMedia("Movie", "movie") movie1 = MockPlexMedia("Movie", "movie")
@ -86,12 +94,57 @@ async def test_media_player_playback(
}, },
True, True,
) )
assert playmedia_mock.called
# Test movie success with resume
playmedia_mock.reset()
with patch("plexapi.library.LibrarySection.search", return_value=movies):
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player,
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_MOVIE,
ATTR_MEDIA_CONTENT_ID: '{"library_name": "Movies", "title": "Movie 1", "resume": true}',
},
True,
)
assert playmedia_mock.called
assert playmedia_mock.last_request.qs["offset"][0] == str(movie1.viewOffset)
# Test movie success with media browser URL
playmedia_mock.reset()
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player,
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_MOVIE,
ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME
+ f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/1",
},
True,
)
assert playmedia_mock.called
# Test movie success with legacy media browser URL
playmedia_mock.reset()
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player,
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_MOVIE,
ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME + "1",
},
True,
)
assert playmedia_mock.called
# Test multiple choices with exact match # Test multiple choices with exact match
playmedia_mock.reset()
movies = [movie1, movie2] movies = [movie1, movie2]
with patch("plexapi.library.LibrarySection.search", return_value=movies), patch( with patch("plexapi.library.LibrarySection.search", return_value=movies):
"homeassistant.components.plex.server.PlexServer.create_playqueue"
) as mock_create_playqueue:
assert await hass.services.async_call( assert await hass.services.async_call(
MP_DOMAIN, MP_DOMAIN,
SERVICE_PLAY_MEDIA, SERVICE_PLAY_MEDIA,
@ -102,9 +155,10 @@ async def test_media_player_playback(
}, },
True, True,
) )
assert mock_create_playqueue.call_args.args == (movie1,) assert playmedia_mock.called
# Test multiple choices without exact match # Test multiple choices without exact match
playmedia_mock.reset()
movies = [movie2, movie3] movies = [movie2, movie3]
with pytest.raises(HomeAssistantError) as excinfo: with pytest.raises(HomeAssistantError) as excinfo:
payload = '{"library_name": "Movies", "title": "Movie" }' payload = '{"library_name": "Movies", "title": "Movie" }'
@ -119,6 +173,7 @@ async def test_media_player_playback(
}, },
True, True,
) )
assert not playmedia_mock.called
assert "Multiple matches, make content_id more specific" in str(excinfo.value) assert "Multiple matches, make content_id more specific" in str(excinfo.value)
# Test multiple choices with allow_multiple # Test multiple choices with allow_multiple
@ -137,3 +192,20 @@ async def test_media_player_playback(
True, True,
) )
assert mock_create_playqueue.call_args.args == (movies,) assert mock_create_playqueue.call_args.args == (movies,)
assert playmedia_mock.called
# Test radio station
playmedia_mock.reset()
radio_id = "/library/sections/3/stations/1"
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player,
ATTR_MEDIA_CONTENT_TYPE: "station",
ATTR_MEDIA_CONTENT_ID: PLEX_URI_SCHEME
+ f"{DEFAULT_DATA[CONF_SERVER_IDENTIFIER]}/{radio_id}",
},
True,
)
assert playmedia_mock.called

View File

@ -13,10 +13,11 @@ from homeassistant.components.plex.const import (
CONF_SERVER_IDENTIFIER, CONF_SERVER_IDENTIFIER,
DOMAIN, DOMAIN,
PLEX_SERVER_CONFIG, PLEX_SERVER_CONFIG,
PLEX_URI_SCHEME,
SERVICE_REFRESH_LIBRARY, SERVICE_REFRESH_LIBRARY,
SERVICE_SCAN_CLIENTS, SERVICE_SCAN_CLIENTS,
) )
from homeassistant.components.plex.services import lookup_plex_media from homeassistant.components.plex.services import process_plex_payload
from homeassistant.const import CONF_URL from homeassistant.const import CONF_URL
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
@ -121,19 +122,25 @@ async def test_lookup_media_for_other_integrations(
playqueue_created, playqueue_created,
): ):
"""Test media lookup for media_player.play_media calls from cast/sonos.""" """Test media lookup for media_player.play_media calls from cast/sonos."""
CONTENT_ID = '{"library_name": "Music", "artist_name": "Artist"}' CONTENT_ID = PLEX_URI_SCHEME + '{"library_name": "Music", "artist_name": "Artist"}'
CONTENT_ID_KEY = "100" CONTENT_ID_KEY = PLEX_URI_SCHEME + "100"
CONTENT_ID_BAD_MEDIA = '{"library_name": "Music", "artist_name": "Not an Artist"}' CONTENT_ID_BAD_MEDIA = (
CONTENT_ID_PLAYQUEUE = '{"playqueue_id": 1234}' PLEX_URI_SCHEME + '{"library_name": "Music", "artist_name": "Not an Artist"}'
CONTENT_ID_BAD_PLAYQUEUE = '{"playqueue_id": 1235}' )
CONTENT_ID_SERVER = '{"plex_server": "Plex Server 1", "library_name": "Music", "artist_name": "Artist"}' CONTENT_ID_PLAYQUEUE = PLEX_URI_SCHEME + '{"playqueue_id": 1234}'
CONTENT_ID_BAD_PLAYQUEUE = PLEX_URI_SCHEME + '{"playqueue_id": 1235}'
CONTENT_ID_SERVER = (
PLEX_URI_SCHEME
+ '{"plex_server": "Plex Server 1", "library_name": "Music", "artist_name": "Artist"}'
)
CONTENT_ID_SHUFFLE = ( CONTENT_ID_SHUFFLE = (
'{"library_name": "Music", "artist_name": "Artist", "shuffle": 1}' PLEX_URI_SCHEME
+ '{"library_name": "Music", "artist_name": "Artist", "shuffle": 1}'
) )
# Test with no Plex integration available # Test with no Plex integration available
with pytest.raises(HomeAssistantError) as excinfo: with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID) process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID)
assert "Plex integration not configured" in str(excinfo.value) assert "Plex integration not configured" in str(excinfo.value)
with patch( with patch(
@ -145,45 +152,61 @@ async def test_lookup_media_for_other_integrations(
# Test with no Plex servers available # Test with no Plex servers available
with pytest.raises(HomeAssistantError) as excinfo: with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID) process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID)
assert "No Plex servers available" in str(excinfo.value) assert "No Plex servers available" in str(excinfo.value)
# Complete setup of a Plex server # Complete setup of a Plex server
await hass.config_entries.async_unload(entry.entry_id) await hass.config_entries.async_unload(entry.entry_id)
await setup_plex_server() await setup_plex_server()
# Test lookup success # Test lookup success without playqueue
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID) result = process_plex_payload(
assert isinstance(result, plexapi.audio.Artist) hass, MEDIA_TYPE_MUSIC, CONTENT_ID, supports_playqueues=False
)
assert isinstance(result.media, plexapi.audio.Artist)
assert not result.shuffle
# Test media key payload # Test media key payload without playqueue
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_KEY) result = process_plex_payload(
assert isinstance(result, plexapi.audio.Track) hass, MEDIA_TYPE_MUSIC, CONTENT_ID_KEY, supports_playqueues=False
)
assert isinstance(result.media, plexapi.audio.Track)
assert not result.shuffle
# Test with specified server # Test with specified server without playqueue
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SERVER) result = process_plex_payload(
assert isinstance(result, plexapi.audio.Artist) hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SERVER, supports_playqueues=False
)
assert isinstance(result.media, plexapi.audio.Artist)
assert not result.shuffle
# Test shuffle without playqueue
result = process_plex_payload(
hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SHUFFLE, supports_playqueues=False
)
assert isinstance(result.media, plexapi.audio.Artist)
assert result.shuffle
# Test with media not found # Test with media not found
with patch("plexapi.library.LibrarySection.search", return_value=None): with patch("plexapi.library.LibrarySection.search", return_value=None):
with pytest.raises(HomeAssistantError) as excinfo: with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_MEDIA) process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_MEDIA)
assert f"No {MEDIA_TYPE_MUSIC} results in 'Music' for" in str(excinfo.value) assert f"No {MEDIA_TYPE_MUSIC} results in 'Music' for" in str(excinfo.value)
# Test with playqueue # Test with playqueue
requests_mock.get("https://1.2.3.4:32400/playQueues/1234", text=playqueue_1234) requests_mock.get("https://1.2.3.4:32400/playQueues/1234", text=playqueue_1234)
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_PLAYQUEUE) result = process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_PLAYQUEUE)
assert isinstance(result, plexapi.playqueue.PlayQueue) assert isinstance(result.media, plexapi.playqueue.PlayQueue)
# Test with invalid playqueue # Test with invalid playqueue
requests_mock.get( requests_mock.get(
"https://1.2.3.4:32400/playQueues/1235", status_code=HTTPStatus.NOT_FOUND "https://1.2.3.4:32400/playQueues/1235", status_code=HTTPStatus.NOT_FOUND
) )
with pytest.raises(HomeAssistantError) as excinfo: with pytest.raises(HomeAssistantError) as excinfo:
lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_PLAYQUEUE) process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_BAD_PLAYQUEUE)
assert "PlayQueue '1235' could not be found" in str(excinfo.value) assert "PlayQueue '1235' could not be found" in str(excinfo.value)
# Test playqueue is created with shuffle # Test playqueue is created with shuffle
requests_mock.post("/playqueues", text=playqueue_created) requests_mock.post("/playqueues", text=playqueue_created)
result = lookup_plex_media(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SHUFFLE) result = process_plex_payload(hass, MEDIA_TYPE_MUSIC, CONTENT_ID_SHUFFLE)
assert isinstance(result, plexapi.playqueue.PlayQueue) assert isinstance(result.media, plexapi.playqueue.PlayQueue)

View File

@ -1,5 +1,6 @@
"""Tests for the Sonos Media Player platform.""" """Tests for the Sonos Media Player platform."""
from unittest.mock import patch import json
from unittest.mock import Mock, patch
import pytest import pytest
@ -10,23 +11,25 @@ from homeassistant.components.media_player.const import (
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
SERVICE_PLAY_MEDIA, SERVICE_PLAY_MEDIA,
) )
from homeassistant.components.plex.const import PLEX_URI_SCHEME from homeassistant.components.plex.const import DOMAIN as PLEX_DOMAIN, PLEX_URI_SCHEME
from homeassistant.const import ATTR_ENTITY_ID from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
async def test_plex_play_media(hass, async_autosetup_sonos): async def test_plex_play_media(hass, async_autosetup_sonos):
"""Test playing media via the Plex integration.""" """Test playing media via the Plex integration."""
mock_plex_server = Mock()
mock_lookup = mock_plex_server.lookup_media
media_player = "media_player.zone_a" media_player = "media_player.zone_a"
media_content_id = ( media_content_id = (
'{"library_name": "Music", "artist_name": "Artist", "album_name": "Album"}' '{"library_name": "Music", "artist_name": "Artist", "album_name": "Album"}'
) )
with patch( with patch(
"homeassistant.components.sonos.media_player.lookup_plex_media" "homeassistant.components.plex.services.get_plex_server",
) as mock_lookup, patch( return_value=mock_plex_server,
"soco.plugins.plex.PlexPlugin.add_to_queue" ), patch("soco.plugins.plex.PlexPlugin.add_to_queue") as mock_add_to_queue, patch(
) as mock_add_to_queue, patch(
"homeassistant.components.sonos.media_player.SonosMediaPlayerEntity.set_shuffle" "homeassistant.components.sonos.media_player.SonosMediaPlayerEntity.set_shuffle"
) as mock_shuffle: ) as mock_shuffle:
# Test successful Plex service call # Test successful Plex service call
@ -44,8 +47,8 @@ async def test_plex_play_media(hass, async_autosetup_sonos):
assert len(mock_lookup.mock_calls) == 1 assert len(mock_lookup.mock_calls) == 1
assert len(mock_add_to_queue.mock_calls) == 1 assert len(mock_add_to_queue.mock_calls) == 1
assert not mock_shuffle.called assert not mock_shuffle.called
assert mock_lookup.mock_calls[0][1][1] == MEDIA_TYPE_MUSIC assert mock_lookup.mock_calls[0][1][0] == MEDIA_TYPE_MUSIC
assert mock_lookup.mock_calls[0][1][2] == media_content_id assert mock_lookup.mock_calls[0][2] == json.loads(media_content_id)
# Test handling shuffle in payload # Test handling shuffle in payload
mock_lookup.reset_mock() mock_lookup.reset_mock()
@ -66,8 +69,8 @@ async def test_plex_play_media(hass, async_autosetup_sonos):
assert mock_shuffle.called assert mock_shuffle.called
assert len(mock_lookup.mock_calls) == 1 assert len(mock_lookup.mock_calls) == 1
assert len(mock_add_to_queue.mock_calls) == 1 assert len(mock_add_to_queue.mock_calls) == 1
assert mock_lookup.mock_calls[0][1][1] == MEDIA_TYPE_MUSIC assert mock_lookup.mock_calls[0][1][0] == MEDIA_TYPE_MUSIC
assert mock_lookup.mock_calls[0][1][2] == media_content_id assert mock_lookup.mock_calls[0][2] == json.loads(media_content_id)
# Test failed Plex service call # Test failed Plex service call
mock_lookup.reset_mock() mock_lookup.reset_mock()
@ -87,3 +90,32 @@ async def test_plex_play_media(hass, async_autosetup_sonos):
) )
assert mock_lookup.called assert mock_lookup.called
assert not mock_add_to_queue.called assert not mock_add_to_queue.called
# Test new media browser payload format
mock_lookup.reset_mock()
mock_lookup.side_effect = None
mock_add_to_queue.reset_mock()
server_id = "unique_id_123"
plex_item_key = 300
with patch(
"homeassistant.components.plex.services.get_plex_server",
return_value=mock_plex_server,
):
assert await hass.services.async_call(
MP_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: media_player,
ATTR_MEDIA_CONTENT_TYPE: MEDIA_TYPE_MUSIC,
ATTR_MEDIA_CONTENT_ID: f"{PLEX_URI_SCHEME}{server_id}/{plex_item_key}?shuffle=1",
},
blocking=True,
)
assert len(mock_lookup.mock_calls) == 1
assert len(mock_add_to_queue.mock_calls) == 1
assert mock_shuffle.called
assert mock_lookup.mock_calls[0][1][0] == PLEX_DOMAIN
assert mock_lookup.mock_calls[0][2] == {"plex_key": plex_item_key}