From dd7f282723d72163c71a3a2b86c1b762772288e0 Mon Sep 17 00:00:00 2001 From: jjlawren Date: Fri, 4 Sep 2020 04:32:36 -0500 Subject: [PATCH] Add new Plex movie lookup method for media_player.play_media (#39584) --- homeassistant/components/plex/errors.py | 4 + homeassistant/components/plex/media_search.py | 116 ++++++++++++++ homeassistant/components/plex/server.py | 149 ++++-------------- tests/components/plex/mock_classes.py | 8 +- tests/components/plex/test_server.py | 50 +++++- 5 files changed, 202 insertions(+), 125 deletions(-) create mode 100644 homeassistant/components/plex/media_search.py diff --git a/homeassistant/components/plex/errors.py b/homeassistant/components/plex/errors.py index 534c553d45e..aacc340e2b1 100644 --- a/homeassistant/components/plex/errors.py +++ b/homeassistant/components/plex/errors.py @@ -16,3 +16,7 @@ class ServerNotSpecified(PlexException): class ShouldUpdateConfigEntry(PlexException): """Config entry data is out of date and should be updated.""" + + +class MediaNotFound(PlexException): + """Media lookup failed for a given search query.""" diff --git a/homeassistant/components/plex/media_search.py b/homeassistant/components/plex/media_search.py new file mode 100644 index 00000000000..5992a49bf3b --- /dev/null +++ b/homeassistant/components/plex/media_search.py @@ -0,0 +1,116 @@ +"""Helper methods to search for Plex media.""" +import logging + +from plexapi.exceptions import BadRequest, NotFound + +from .errors import MediaNotFound + +_LOGGER = logging.getLogger(__name__) + + +def lookup_movie(library_section, **kwargs): + """Find a specific movie and return a Plex media object.""" + try: + title = kwargs["title"] + except KeyError: + _LOGGER.error("Must specify 'title' for this search") + return None + + try: + movies = library_section.search(**kwargs, libtype="movie", maxresults=3) + except BadRequest as err: + _LOGGER.error("Invalid search payload provided: %s", err) + return None + + if not movies: + raise MediaNotFound(f"Movie {title}") from None + + if len(movies) > 1: + exact_matches = [x for x in movies if x.title.lower() == title.lower()] + if len(exact_matches) == 1: + return exact_matches[0] + match_list = [f"{x.title} ({x.year})" for x in movies] + _LOGGER.warning("Multiple matches found during search: %s", match_list) + return None + + return movies[0] + + +def lookup_tv(library_section, **kwargs): + """Find TV media and return a Plex media object.""" + season_number = kwargs.get("season_number") + episode_number = kwargs.get("episode_number") + + try: + show_name = kwargs["show_name"] + show = library_section.get(show_name) + except KeyError: + _LOGGER.error("Must specify 'show_name' for this search") + return None + except NotFound as err: + raise MediaNotFound(f"Show {show_name}") from err + + if not season_number: + return show + + try: + season = show.season(int(season_number)) + except NotFound as err: + raise MediaNotFound(f"Season {season_number} of {show_name}") from err + + if not episode_number: + return season + + try: + return season.episode(episode=int(episode_number)) + except NotFound as err: + episode = f"S{str(season_number).zfill(2)}E{str(episode_number).zfill(2)}" + raise MediaNotFound(f"Episode {episode} of {show_name}") from err + + +def lookup_music(library_section, **kwargs): + """Search for music and return a Plex media object.""" + album_name = kwargs.get("album_name") + track_name = kwargs.get("track_name") + track_number = kwargs.get("track_number") + + try: + artist_name = kwargs["artist_name"] + artist = library_section.get(artist_name) + except KeyError: + _LOGGER.error("Must specify 'artist_name' for this search") + return None + except NotFound as err: + raise MediaNotFound(f"Artist {artist_name}") from err + + if album_name: + try: + album = artist.album(album_name) + except NotFound as err: + raise MediaNotFound(f"Album {album_name} by {artist_name}") from err + + if track_name: + try: + return album.track(track_name) + except NotFound as err: + raise MediaNotFound( + f"Track {track_name} on {album_name} by {artist_name}" + ) from err + + if track_number: + for track in album.tracks(): + if int(track.index) == int(track_number): + return track + + raise MediaNotFound( + f"Track {track_number} on {album_name} by {artist_name}" + ) from None + return album + + if track_name: + try: + return artist.get(track_name) + except NotFound as err: + raise MediaNotFound(f"Track {track_name} by {artist_name}") from err + + return artist diff --git a/homeassistant/components/plex/server.py b/homeassistant/components/plex/server.py index 1d237dedb01..f8706eadf22 100644 --- a/homeassistant/components/plex/server.py +++ b/homeassistant/components/plex/server.py @@ -14,6 +14,7 @@ import requests.exceptions from homeassistant.components.media_player import DOMAIN as MP_DOMAIN from homeassistant.components.media_player.const import ( MEDIA_TYPE_EPISODE, + MEDIA_TYPE_MOVIE, MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST, MEDIA_TYPE_VIDEO, @@ -41,7 +42,13 @@ from .const import ( X_PLEX_PRODUCT, X_PLEX_VERSION, ) -from .errors import NoServersFound, ServerNotSpecified, ShouldUpdateConfigEntry +from .errors import ( + MediaNotFound, + NoServersFound, + ServerNotSpecified, + ShouldUpdateConfigEntry, +) +from .media_search import lookup_movie, lookup_music, lookup_tv _LOGGER = logging.getLogger(__name__) @@ -487,7 +494,7 @@ class PlexServer: return None try: - library_name = kwargs["library_name"] + library_name = kwargs.pop("library_name") library_section = self.library.section(library_name) except KeyError: _LOGGER.error("Must specify 'library_name' for this search") @@ -496,125 +503,23 @@ class PlexServer: _LOGGER.error("Library '%s' not found", library_name) return None - def lookup_music(): - """Search for music and return a Plex media object.""" - album_name = kwargs.get("album_name") - track_name = kwargs.get("track_name") - track_number = kwargs.get("track_number") - - try: - artist_name = kwargs["artist_name"] - artist = library_section.get(artist_name) - except KeyError: - _LOGGER.error("Must specify 'artist_name' for this search") - return None - except NotFound: - _LOGGER.error( - "Artist '%s' not found in '%s'", artist_name, library_name - ) - return None - - if album_name: + try: + if media_type == MEDIA_TYPE_EPISODE: + return lookup_tv(library_section, **kwargs) + if media_type == MEDIA_TYPE_MOVIE: + return lookup_movie(library_section, **kwargs) + if media_type == MEDIA_TYPE_MUSIC: + return lookup_music(library_section, **kwargs) + if media_type == MEDIA_TYPE_VIDEO: + # Legacy method for compatibility try: - album = artist.album(album_name) - except NotFound: - _LOGGER.error( - "Album '%s' by '%s' not found", album_name, artist_name - ) + video_name = kwargs["video_name"] + return library_section.get(video_name) + except KeyError: + _LOGGER.error("Must specify 'video_name' for this search") return None - - if track_name: - try: - return album.track(track_name) - except NotFound: - _LOGGER.error( - "Track '%s' on '%s' by '%s' not found", - track_name, - album_name, - artist_name, - ) - return None - - if track_number: - for track in album.tracks(): - if int(track.index) == int(track_number): - return track - - _LOGGER.error( - "Track %d on '%s' by '%s' not found", - track_number, - album_name, - artist_name, - ) - return None - return album - - if track_name: - try: - return artist.get(track_name) - except NotFound: - _LOGGER.error( - "Track '%s' by '%s' not found", track_name, artist_name - ) - return None - - return artist - - def lookup_tv(): - """Find TV media and return a Plex media object.""" - season_number = kwargs.get("season_number") - episode_number = kwargs.get("episode_number") - - try: - show_name = kwargs["show_name"] - show = library_section.get(show_name) - except KeyError: - _LOGGER.error("Must specify 'show_name' for this search") - return None - except NotFound: - _LOGGER.error("Show '%s' not found in '%s'", show_name, library_name) - return None - - if not season_number: - return show - - try: - season = show.season(int(season_number)) - except NotFound: - _LOGGER.error( - "Season %d of '%s' not found", - season_number, - show_name, - ) - return None - - if not episode_number: - return season - - try: - return season.episode(episode=int(episode_number)) - except NotFound: - _LOGGER.error( - "Episode not found: %s - S%sE%s", - show_name, - str(season_number).zfill(2), - str(episode_number).zfill(2), - ) - return None - - if media_type == MEDIA_TYPE_MUSIC: - return lookup_music() - if media_type == MEDIA_TYPE_EPISODE: - return lookup_tv() - if media_type == MEDIA_TYPE_VIDEO: - try: - video_name = kwargs["video_name"] - return library_section.get(video_name) - except KeyError: - _LOGGER.error("Must specify 'video_name' for this search") - except NotFound: - _LOGGER.error( - "Movie '%s' not found in '%s'", - video_name, - library_name, - ) + except NotFound as err: + raise MediaNotFound(f"Video {video_name}") from err + except MediaNotFound as failed_item: + _LOGGER.error("%s not found in %s", failed_item, library_name) + return None diff --git a/tests/components/plex/mock_classes.py b/tests/components/plex/mock_classes.py index 1fc705be1ca..7cdac1b669a 100644 --- a/tests/components/plex/mock_classes.py +++ b/tests/components/plex/mock_classes.py @@ -414,6 +414,11 @@ class MockPlexLibrarySection: """Mock the key identifier property.""" return str(id(self.title)) + def search(self, **kwargs): + """Mock the LibrarySection search method.""" + if kwargs.get("libtype") == "movie": + return self.all() + def update(self): """Mock the update call.""" pass @@ -422,11 +427,12 @@ class MockPlexLibrarySection: class MockPlexMediaItem: """Mock a Plex Media instance.""" - def __init__(self, title, mediatype="video"): + def __init__(self, title, mediatype="video", year=2020): """Initialize the object.""" self.title = str(title) self.type = mediatype self.thumbUrl = "http://1.2.3.4/thumb.png" + self.year = year self._children = [] def __iter__(self): diff --git a/tests/components/plex/test_server.py b/tests/components/plex/test_server.py index d911b258635..b3623681f8a 100644 --- a/tests/components/plex/test_server.py +++ b/tests/components/plex/test_server.py @@ -1,7 +1,7 @@ """Tests for Plex server.""" import copy -from plexapi.exceptions import NotFound +from plexapi.exceptions import BadRequest, NotFound from requests.exceptions import RequestException from homeassistant.components.media_player import DOMAIN as MP_DOMAIN @@ -9,6 +9,7 @@ from homeassistant.components.media_player.const import ( ATTR_MEDIA_CONTENT_ID, ATTR_MEDIA_CONTENT_TYPE, MEDIA_TYPE_EPISODE, + MEDIA_TYPE_MOVIE, MEDIA_TYPE_MUSIC, MEDIA_TYPE_PLAYLIST, MEDIA_TYPE_VIDEO, @@ -32,6 +33,7 @@ from .mock_classes import ( MockPlexArtist, MockPlexLibrary, MockPlexLibrarySection, + MockPlexMediaItem, MockPlexSeason, MockPlexServer, MockPlexShow, @@ -454,7 +456,7 @@ async def test_media_lookups(hass): is None ) - # Movie searches + # Legacy Movie searches assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, video_name="Movie") is None assert loaded_server.lookup_media(MEDIA_TYPE_VIDEO, library_name="Movies") is None assert loaded_server.lookup_media( @@ -467,3 +469,47 @@ async def test_media_lookups(hass): ) is None ) + + # Movie searches + assert loaded_server.lookup_media(MEDIA_TYPE_MOVIE, title="Movie") is None + assert loaded_server.lookup_media(MEDIA_TYPE_MOVIE, library_name="Movies") is None + assert loaded_server.lookup_media( + MEDIA_TYPE_MOVIE, library_name="Movies", title="Movie" + ) + with patch.object(MockPlexLibrarySection, "search", side_effect=BadRequest): + assert ( + loaded_server.lookup_media( + MEDIA_TYPE_MOVIE, library_name="Movies", title="Not a Movie" + ) + is None + ) + with patch.object(MockPlexLibrarySection, "search", return_value=[]): + assert ( + loaded_server.lookup_media( + MEDIA_TYPE_MOVIE, library_name="Movies", title="Not a Movie" + ) + is None + ) + + similar_movies = [] + for title in "Duplicate Movie", "Duplicate Movie 2": + similar_movies.append(MockPlexMediaItem(title)) + with patch.object( + loaded_server.library.section("Movies"), "search", return_value=similar_movies + ): + found_media = loaded_server.lookup_media( + MEDIA_TYPE_MOVIE, library_name="Movies", title="Duplicate Movie" + ) + assert found_media.title == "Duplicate Movie" + + duplicate_movies = [] + for title in "Duplicate Movie - Original", "Duplicate Movie - Remake": + duplicate_movies.append(MockPlexMediaItem(title)) + with patch.object( + loaded_server.library.section("Movies"), "search", return_value=duplicate_movies + ): + assert ( + loaded_server.lookup_media( + MEDIA_TYPE_MOVIE, library_name="Movies", title="Duplicate Movie" + ) + ) is None