Add tests for Plex media browser (#39220)

This commit is contained in:
jjlawren 2020-08-26 16:24:44 -05:00 committed by GitHub
parent a2651845f3
commit 1bc4de2bd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 354 additions and 57 deletions

View File

@ -1,4 +1,9 @@
"""Mock classes used in tests.""" """Mock classes used in tests."""
from functools import lru_cache
from aiohttp.helpers import reify
from plexapi.exceptions import NotFound
from homeassistant.components.plex.const import ( from homeassistant.components.plex.const import (
CONF_SERVER, CONF_SERVER,
CONF_SERVER_IDENTIFIER, CONF_SERVER_IDENTIFIER,
@ -116,11 +121,15 @@ class MockPlexServer:
self._systemAccounts = list(map(MockPlexSystemAccount, range(num_users))) self._systemAccounts = list(map(MockPlexSystemAccount, range(num_users)))
self._library = None
self._clients = [] self._clients = []
self._sessions = [] self._sessions = []
self.set_clients(num_users) self.set_clients(num_users)
self.set_sessions(num_users, session_type) self.set_sessions(num_users, session_type)
self._cache = {}
def set_clients(self, num_clients): def set_clients(self, num_clients):
"""Set up mock PlexClients for this PlexServer.""" """Set up mock PlexClients for this PlexServer."""
self._clients = [MockPlexClient(self._baseurl, x) for x in range(num_clients)] self._clients = [MockPlexClient(self._baseurl, x) for x in range(num_clients)]
@ -166,18 +175,32 @@ class MockPlexServer:
"""Mock version of PlexServer.""" """Mock version of PlexServer."""
return "1.0" return "1.0"
@property @reify
def library(self): def library(self):
"""Mock library object of PlexServer.""" """Mock library object of PlexServer."""
return MockPlexLibrary() return MockPlexLibrary(self)
def playlist(self, playlist): def playlist(self, playlist):
"""Mock the playlist lookup method.""" """Mock the playlist lookup method."""
return MockPlexMediaItem(playlist, mediatype="playlist") return MockPlexMediaItem(playlist, mediatype="playlist")
@lru_cache()
def playlists(self):
"""Mock the playlists lookup method with a lazy init."""
return [
MockPlexPlaylist(
self.library.section("Movies").all()
+ self.library.section("TV Shows").all()
),
MockPlexPlaylist(self.library.section("Music").all()),
]
def fetchItem(self, item): def fetchItem(self, item):
"""Mock the fetchItem method.""" """Mock the fetchItem method."""
return MockPlexMediaItem("Item Name") for section in self.library.sections():
result = section.fetchItem(item)
if result:
return result
class MockPlexClient: class MockPlexClient:
@ -247,7 +270,7 @@ class MockPlexSession:
self.TYPE = mediatype self.TYPE = mediatype
self.usernames = [list(MOCK_USERS)[index]] self.usernames = [list(MOCK_USERS)[index]]
self.players = [player] self.players = [player]
self._section = MockPlexLibrarySection() self._section = MockPlexLibrarySection("Movies")
@property @property
def duration(self): def duration(self):
@ -302,26 +325,80 @@ class MockPlexSession:
class MockPlexLibrary: class MockPlexLibrary:
"""Mock a Plex Library instance.""" """Mock a Plex Library instance."""
def __init__(self): def __init__(self, plex_server):
"""Initialize the object.""" """Initialize the object."""
self._plex_server = plex_server
self._sections = {}
def section(self, library_name): for kind in ["Movies", "Music", "TV Shows", "Photos"]:
self._sections[kind] = MockPlexLibrarySection(kind)
def section(self, title):
"""Mock the LibrarySection lookup.""" """Mock the LibrarySection lookup."""
return MockPlexLibrarySection(library_name) section = self._sections.get(title)
if section:
return section
raise NotFound
def sections(self):
"""Return all available sections."""
return self._sections.values()
def sectionByID(self, section_id):
"""Mock the sectionByID lookup."""
return [x for x in self.sections() if x.key == section_id][0]
class MockPlexLibrarySection: class MockPlexLibrarySection:
"""Mock a Plex LibrarySection instance.""" """Mock a Plex LibrarySection instance."""
def __init__(self, library="Movies"): def __init__(self, library):
"""Initialize the object.""" """Initialize the object."""
self.title = library self.title = library
if library == "Music":
self._item = MockPlexArtist("Artist")
elif library == "TV Shows":
self._item = MockPlexShow("TV Show")
else:
self._item = MockPlexMediaItem(library[:-1])
def get(self, query): def get(self, query):
"""Mock the get lookup method.""" """Mock the get lookup method."""
if self._item.title == query:
return self._item
raise NotFound
def all(self):
"""Mock the all method."""
return [self._item]
def fetchItem(self, ratingKey):
"""Return a specific item."""
for item in self.all():
if item.ratingKey == ratingKey:
return item
if item._children:
for child in item._children:
if child.ratingKey == ratingKey:
return child
@property
def type(self):
"""Mock the library type."""
if self.title == "Movies":
return "movie"
if self.title == "Music": if self.title == "Music":
return MockPlexArtist(query) return "artist"
return MockPlexMediaItem(query) if self.title == "TV Shows":
return "show"
if self.title == "Photos":
return "photo"
@property
def key(self):
"""Mock the key identifier property."""
return str(id(self.title))
class MockPlexMediaItem: class MockPlexMediaItem:
@ -331,27 +408,76 @@ class MockPlexMediaItem:
"""Initialize the object.""" """Initialize the object."""
self.title = str(title) self.title = str(title)
self.type = mediatype self.type = mediatype
self.thumbUrl = "http://1.2.3.4/thumb.png"
self._children = []
def album(self, album): def __iter__(self):
"""Mock the album lookup method.""" """Provide iterator."""
return MockPlexMediaItem(album, mediatype="album") yield from self._children
def track(self, track): @property
"""Mock the track lookup method.""" def ratingKey(self):
return MockPlexMediaTrack() """Mock the ratingKey property."""
return id(self.title)
def tracks(self):
"""Mock the tracks lookup method."""
for index in range(1, 10):
yield MockPlexMediaTrack(index)
def episode(self, episode): class MockPlexPlaylist(MockPlexMediaItem):
"""Mock the episode lookup method.""" """Mock a Plex Playlist instance."""
return MockPlexMediaItem(episode, mediatype="episode")
def __init__(self, items):
"""Initialize the object."""
super().__init__(f"Playlist ({len(items)} Items)", "playlist")
for item in items:
self._children.append(item)
class MockPlexShow(MockPlexMediaItem):
"""Mock a Plex Show instance."""
def __init__(self, show):
"""Initialize the object."""
super().__init__(show, "show")
for index in range(1, 5):
self._children.append(MockPlexSeason(index))
def season(self, season): def season(self, season):
"""Mock the season lookup method.""" """Mock the season lookup method."""
return MockPlexMediaItem(season, mediatype="season") return [x for x in self._children if x.title == f"Season {season}"][0]
class MockPlexSeason(MockPlexMediaItem):
"""Mock a Plex Season instance."""
def __init__(self, season):
"""Initialize the object."""
super().__init__(f"Season {season}", "season")
for index in range(1, 10):
self._children.append(MockPlexMediaItem(f"Episode {index}", "episode"))
def episode(self, episode):
"""Mock the episode lookup method."""
return self._children[episode - 1]
class MockPlexAlbum(MockPlexMediaItem):
"""Mock a Plex Album instance."""
def __init__(self, album):
"""Initialize the object."""
super().__init__(album, "album")
for index in range(1, 10):
self._children.append(MockPlexMediaTrack(index))
def track(self, track):
"""Mock the track lookup method."""
try:
return [x for x in self._children if x.title == track][0]
except IndexError:
raise NotFound
def tracks(self):
"""Mock the tracks lookup method."""
return self._children
class MockPlexArtist(MockPlexMediaItem): class MockPlexArtist(MockPlexMediaItem):
@ -359,12 +485,16 @@ class MockPlexArtist(MockPlexMediaItem):
def __init__(self, artist): def __init__(self, artist):
"""Initialize the object.""" """Initialize the object."""
super().__init__(artist) super().__init__(artist, "artist")
self.type = "artist" self._album = MockPlexAlbum("Album")
def album(self, album):
"""Mock the album lookup method."""
return self._album
def get(self, track): def get(self, track):
"""Mock the track lookup method.""" """Mock the track lookup method."""
return MockPlexMediaTrack() return self._album.track(track)
class MockPlexMediaTrack(MockPlexMediaItem): class MockPlexMediaTrack(MockPlexMediaItem):

View File

@ -0,0 +1,165 @@
"""Tests for Plex media browser."""
from homeassistant.components.media_player.const import (
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
)
from homeassistant.components.plex.const import CONF_SERVER_IDENTIFIER, DOMAIN
from homeassistant.components.websocket_api.const import ERR_UNKNOWN_ERROR, TYPE_RESULT
from .const import DEFAULT_DATA, DEFAULT_OPTIONS
from .helpers import trigger_plex_update
from .mock_classes import MockPlexAccount, MockPlexServer
from tests.async_mock import patch
from tests.common import MockConfigEntry
async def test_browse_media(hass, hass_ws_client):
"""Test getting Plex clients from plex.tv."""
entry = MockConfigEntry(
domain=DOMAIN,
data=DEFAULT_DATA,
options=DEFAULT_OPTIONS,
unique_id=DEFAULT_DATA["server_id"],
)
mock_plex_server = MockPlexServer(config_entry=entry)
mock_plex_account = MockPlexAccount()
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
"plexapi.myplex.MyPlexAccount", return_value=mock_plex_account
), patch(
"homeassistant.components.plex.PlexWebsocket", autospec=True
) as mock_websocket:
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
websocket_client = await hass_ws_client(hass)
trigger_plex_update(mock_websocket)
await hass.async_block_till_done()
media_players = hass.states.async_entity_ids("media_player")
msg_id = 1
# Browse base of non-existent Plex server
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: "server",
ATTR_MEDIA_CONTENT_ID: "this server does not exist",
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert not msg["success"]
assert msg["error"]["code"] == ERR_UNKNOWN_ERROR
# Browse base of Plex server
msg_id += 1
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert msg["success"]
result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "server"
assert result[ATTR_MEDIA_CONTENT_ID] == DEFAULT_DATA[CONF_SERVER_IDENTIFIER]
assert len(result["children"]) == len(mock_plex_server.library.sections())
tvshows = next(iter(x for x in result["children"] if x["title"] == "TV Shows"))
playlists = next(iter(x for x in result["children"] if x["title"] == "Playlists"))
# Browse into a Plex TV show library
msg_id += 1
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: tvshows[ATTR_MEDIA_CONTENT_TYPE],
ATTR_MEDIA_CONTENT_ID: str(tvshows[ATTR_MEDIA_CONTENT_ID]),
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert msg["success"]
result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "library"
result_id = result[ATTR_MEDIA_CONTENT_ID]
assert len(result["children"]) == len(
mock_plex_server.library.sectionByID(result_id).all()
)
# Browse into a Plex TV show
msg_id += 1
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: result["children"][0][ATTR_MEDIA_CONTENT_TYPE],
ATTR_MEDIA_CONTENT_ID: str(result["children"][0][ATTR_MEDIA_CONTENT_ID]),
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert msg["success"]
result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "show"
result_id = int(result[ATTR_MEDIA_CONTENT_ID])
assert result["title"] == mock_plex_server.fetchItem(result_id).title
# Browse into a non-existent TV season
msg_id += 1
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: result["children"][0][ATTR_MEDIA_CONTENT_TYPE],
ATTR_MEDIA_CONTENT_ID: str(99999999999999),
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert not msg["success"]
assert msg["error"]["code"] == ERR_UNKNOWN_ERROR
# Browse Plex playlists
msg_id += 1
await websocket_client.send_json(
{
"id": msg_id,
"type": "media_player/browse_media",
"entity_id": media_players[0],
ATTR_MEDIA_CONTENT_TYPE: playlists[ATTR_MEDIA_CONTENT_TYPE],
ATTR_MEDIA_CONTENT_ID: str(playlists[ATTR_MEDIA_CONTENT_ID]),
}
)
msg = await websocket_client.receive_json()
assert msg["id"] == msg_id
assert msg["type"] == TYPE_RESULT
assert msg["success"]
result = msg["result"]
assert result[ATTR_MEDIA_CONTENT_TYPE] == "playlists"
result_id = result[ATTR_MEDIA_CONTENT_ID]

View File

@ -28,11 +28,13 @@ from .const import DEFAULT_DATA, DEFAULT_OPTIONS
from .helpers import trigger_plex_update from .helpers import trigger_plex_update
from .mock_classes import ( from .mock_classes import (
MockPlexAccount, MockPlexAccount,
MockPlexAlbum,
MockPlexArtist, MockPlexArtist,
MockPlexLibrary, MockPlexLibrary,
MockPlexLibrarySection, MockPlexLibrarySection,
MockPlexMediaItem, MockPlexSeason,
MockPlexServer, MockPlexServer,
MockPlexShow,
) )
from tests.async_mock import patch from tests.async_mock import patch
@ -298,7 +300,7 @@ async def test_media_lookups(hass):
with patch.object(MockPlexLibrary, "section", side_effect=NotFound): with patch.object(MockPlexLibrary, "section", side_effect=NotFound):
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="Not a Library", show_name="A TV Show" MEDIA_TYPE_EPISODE, library_name="Not a Library", show_name="TV Show"
) )
is None is None
) )
@ -316,37 +318,37 @@ async def test_media_lookups(hass):
is None is None
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, library_name="TV Shows", show_name="A TV Show" MEDIA_TYPE_EPISODE, library_name="TV Shows", show_name="TV Show"
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, MEDIA_TYPE_EPISODE,
library_name="TV Shows", library_name="TV Shows",
show_name="A TV Show", show_name="TV Show",
season_number=2, season_number=2,
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, MEDIA_TYPE_EPISODE,
library_name="TV Shows", library_name="TV Shows",
show_name="A TV Show", show_name="TV Show",
season_number=2, season_number=2,
episode_number=3, episode_number=3,
) )
with patch.object(MockPlexMediaItem, "season", side_effect=NotFound): with patch.object(MockPlexShow, "season", side_effect=NotFound):
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, MEDIA_TYPE_EPISODE,
library_name="TV Shows", library_name="TV Shows",
show_name="A TV Show", show_name="TV Show",
season_number=2, season_number=2,
) )
is None is None
) )
with patch.object(MockPlexMediaItem, "episode", side_effect=NotFound): with patch.object(MockPlexSeason, "episode", side_effect=NotFound):
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_EPISODE, MEDIA_TYPE_EPISODE,
library_name="TV Shows", library_name="TV Shows",
show_name="A TV Show", show_name="TV Show",
season_number=2, season_number=2,
episode_number=1, episode_number=1,
) )
@ -356,24 +358,24 @@ async def test_media_lookups(hass):
# Music searches # Music searches
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, library_name="Music", album_name="An Album" MEDIA_TYPE_MUSIC, library_name="Music", album_name="Album"
) )
is None is None
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, library_name="Music", artist_name="An Artist" MEDIA_TYPE_MUSIC, library_name="Music", artist_name="Artist"
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
track_name="A Track", track_name="Track 3",
) )
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="An Album", album_name="Album",
) )
with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound): with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound):
assert ( assert (
@ -381,7 +383,7 @@ async def test_media_lookups(hass):
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="Not an Artist", artist_name="Not an Artist",
album_name="An Album", album_name="Album",
) )
is None is None
) )
@ -390,18 +392,18 @@ async def test_media_lookups(hass):
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="Not an Album", album_name="Not an Album",
) )
is None is None
) )
with patch.object(MockPlexMediaItem, "track", side_effect=NotFound): with patch.object(MockPlexAlbum, "track", side_effect=NotFound):
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="An Album", album_name=" Album",
track_name="Not a Track", track_name="Not a Track",
) )
is None is None
@ -411,7 +413,7 @@ async def test_media_lookups(hass):
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
track_name="Not a Track", track_name="Not a Track",
) )
is None is None
@ -419,16 +421,16 @@ async def test_media_lookups(hass):
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="An Album", album_name="Album",
track_number=3, track_number=3,
) )
assert ( assert (
loaded_server.lookup_media( loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="An Album", album_name="Album",
track_number=30, track_number=30,
) )
is None is None
@ -436,9 +438,9 @@ async def test_media_lookups(hass):
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_MUSIC, MEDIA_TYPE_MUSIC,
library_name="Music", library_name="Music",
artist_name="An Artist", artist_name="Artist",
album_name="An Album", album_name="Album",
track_name="A Track", track_name="Track 3",
) )
# Playlist searches # Playlist searches
@ -453,10 +455,10 @@ async def test_media_lookups(hass):
) )
# Movie searches # Movie searches
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, video_name="A Movie") is None assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, video_name="Movie") is None
assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, library_name="Movies") is None assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, library_name="Movies") is None
assert loaded_server.lookup_media( assert loaded_server.lookup_media(
MEDIA_TYPE_VIDEO, library_name="Movies", video_name="A Movie" MEDIA_TYPE_VIDEO, library_name="Movies", video_name="Movie"
) )
with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound): with patch.object(MockPlexLibrarySection, "get", side_effect=NotFound):
assert ( assert (