mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 05:07:41 +00:00
Plex tests cleanup and additions (#37117)
This commit is contained in:
parent
56907392d3
commit
40573bf393
@ -104,14 +104,6 @@ class PlexServer:
|
||||
raise
|
||||
return self._plex_account
|
||||
|
||||
@property
|
||||
def plextv_resources(self):
|
||||
"""Return all resources linked to Plex account."""
|
||||
if self.account is None:
|
||||
return []
|
||||
|
||||
return self.account.resources()
|
||||
|
||||
def plextv_clients(self):
|
||||
"""Return available clients linked to Plex account."""
|
||||
if self.account is None:
|
||||
@ -122,7 +114,7 @@ class PlexServer:
|
||||
self._plextv_client_timestamp = now
|
||||
self._plextv_clients = [
|
||||
x
|
||||
for x in self.plextv_resources
|
||||
for x in self.account.resources()
|
||||
if "player" in x.provides and x.presence
|
||||
]
|
||||
_LOGGER.debug(
|
||||
@ -137,7 +129,7 @@ class PlexServer:
|
||||
def _connect_with_token():
|
||||
available_servers = [
|
||||
(x.name, x.clientIdentifier)
|
||||
for x in self.plextv_resources
|
||||
for x in self.account.resources()
|
||||
if "server" in x.provides
|
||||
]
|
||||
|
||||
@ -165,7 +157,7 @@ class PlexServer:
|
||||
def _update_plexdirect_hostname():
|
||||
matching_servers = [
|
||||
x.name
|
||||
for x in self.plextv_resources
|
||||
for x in self.account.resources()
|
||||
if x.clientIdentifier == self._server_id
|
||||
]
|
||||
if matching_servers:
|
||||
|
9
tests/components/plex/helpers.py
Normal file
9
tests/components/plex/helpers.py
Normal file
@ -0,0 +1,9 @@
|
||||
"""Helper methods for Plex tests."""
|
||||
from homeassistant.components.plex.const import DOMAIN, WEBSOCKETS
|
||||
|
||||
|
||||
def trigger_plex_update(hass, plex_server):
|
||||
"""Call the websocket callback method."""
|
||||
server_id = plex_server.machineIdentifier
|
||||
websocket = hass.data[DOMAIN][WEBSOCKETS][server_id]
|
||||
websocket.callback()
|
@ -18,7 +18,6 @@ from homeassistant.components.plex.const import (
|
||||
DOMAIN,
|
||||
MANUAL_SETUP_STRING,
|
||||
PLEX_SERVER_CONFIG,
|
||||
PLEX_UPDATE_PLATFORMS_SIGNAL,
|
||||
SERVERS,
|
||||
)
|
||||
from homeassistant.config import async_process_ha_core_config
|
||||
@ -34,9 +33,9 @@ from homeassistant.const import (
|
||||
CONF_URL,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from .const import DEFAULT_DATA, DEFAULT_OPTIONS, MOCK_SERVERS, MOCK_TOKEN
|
||||
from .helpers import trigger_plex_update
|
||||
from .mock_classes import MockGDM, MockPlexAccount, MockPlexServer, MockResource
|
||||
|
||||
from tests.async_mock import patch
|
||||
@ -480,7 +479,7 @@ async def test_option_flow_new_users_available(hass, caplog):
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
with patch("plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount()):
|
||||
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
|
||||
|
@ -14,96 +14,15 @@ from homeassistant.config_entries import (
|
||||
ENTRY_STATE_SETUP_RETRY,
|
||||
)
|
||||
from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
from .const import DEFAULT_DATA, DEFAULT_OPTIONS
|
||||
from .helpers import trigger_plex_update
|
||||
from .mock_classes import MockPlexAccount, MockPlexServer
|
||||
|
||||
from tests.async_mock import patch
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
# class TestClockedPlex(ClockedTestCase):
|
||||
# """Create clock-controlled tests.async_mock class."""
|
||||
|
||||
# @pytest.fixture(autouse=True)
|
||||
# def inject_fixture(self, caplog, hass_storage):
|
||||
# """Inject pytest fixtures as instance attributes."""
|
||||
# self.caplog = caplog
|
||||
|
||||
# async def setUp(self):
|
||||
# """Initialize this test class."""
|
||||
# self.hass = await async_test_home_assistant(self.loop)
|
||||
|
||||
# async def tearDown(self):
|
||||
# """Clean up the HomeAssistant instance."""
|
||||
# await self.hass.async_stop()
|
||||
|
||||
# async def test_setup_with_config_entry(self):
|
||||
# """Test setup component with config."""
|
||||
# hass = self.hass
|
||||
|
||||
# mock_plex_server = MockPlexServer()
|
||||
|
||||
# entry = MockConfigEntry(
|
||||
# domain=const.DOMAIN,
|
||||
# data=DEFAULT_DATA,
|
||||
# options=DEFAULT_OPTIONS,
|
||||
# unique_id=DEFAULT_DATA["server_id"],
|
||||
# )
|
||||
|
||||
# with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
# "homeassistant.components.plex.PlexWebsocket.listen"
|
||||
# ) as mock_listen:
|
||||
# entry.add_to_hass(hass)
|
||||
# assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
# await hass.async_block_till_done()
|
||||
|
||||
# assert mock_listen.called
|
||||
|
||||
# assert len(hass.config_entries.async_entries(const.DOMAIN)) == 1
|
||||
# assert entry.state == ENTRY_STATE_LOADED
|
||||
|
||||
# server_id = mock_plex_server.machineIdentifier
|
||||
# loaded_server = hass.data[const.DOMAIN][const.SERVERS][server_id]
|
||||
|
||||
# assert loaded_server.plex_server == mock_plex_server
|
||||
|
||||
# async_dispatcher_send(
|
||||
# hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
|
||||
# )
|
||||
# await hass.async_block_till_done()
|
||||
|
||||
# sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
# assert sensor.state == str(len(mock_plex_server.accounts))
|
||||
|
||||
# # Ensure existing entities refresh
|
||||
# await self.advance(const.DEBOUNCE_TIMEOUT)
|
||||
# async_dispatcher_send(
|
||||
# hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
|
||||
# )
|
||||
# await hass.async_block_till_done()
|
||||
|
||||
# for test_exception in (
|
||||
# plexapi.exceptions.BadRequest,
|
||||
# requests.exceptions.RequestException,
|
||||
# ):
|
||||
# with patch.object(
|
||||
# mock_plex_server, "clients", side_effect=test_exception
|
||||
# ) as patched_clients_bad_request:
|
||||
# await self.advance(const.DEBOUNCE_TIMEOUT)
|
||||
# async_dispatcher_send(
|
||||
# hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
|
||||
# )
|
||||
# await hass.async_block_till_done()
|
||||
|
||||
# assert patched_clients_bad_request.called
|
||||
# assert (
|
||||
# f"Could not connect to Plex server: {mock_plex_server.friendlyName}"
|
||||
# in self.caplog.text
|
||||
# )
|
||||
# self.caplog.clear()
|
||||
|
||||
|
||||
async def test_set_config_entry_unique_id(hass):
|
||||
"""Test updating missing unique_id from config entry."""
|
||||
@ -252,9 +171,7 @@ async def test_setup_with_photo_session(hass):
|
||||
assert len(hass.config_entries.async_entries(const.DOMAIN)) == 1
|
||||
assert entry.state == ENTRY_STATE_LOADED
|
||||
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
media_player = hass.states.get("media_player.plex_product_title")
|
||||
@ -343,6 +260,30 @@ async def test_tokenless_server(hass):
|
||||
unique_id=DEFAULT_DATA["server_id"],
|
||||
)
|
||||
|
||||
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
"homeassistant.components.plex.PlexWebsocket.listen"
|
||||
):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert entry.state == ENTRY_STATE_LOADED
|
||||
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_bad_token_with_tokenless_server(hass):
|
||||
"""Test setup with a bad token and a server with token auth disabled."""
|
||||
mock_plex_server = MockPlexServer()
|
||||
|
||||
entry = MockConfigEntry(
|
||||
domain=const.DOMAIN,
|
||||
data=DEFAULT_DATA,
|
||||
options=DEFAULT_OPTIONS,
|
||||
unique_id=DEFAULT_DATA["server_id"],
|
||||
)
|
||||
|
||||
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
"plexapi.myplex.MyPlexAccount", side_effect=plexapi.exceptions.Unauthorized
|
||||
), patch("homeassistant.components.plex.PlexWebsocket.listen"):
|
||||
@ -352,7 +293,5 @@ async def test_tokenless_server(hass):
|
||||
|
||||
assert entry.state == ENTRY_STATE_LOADED
|
||||
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
@ -2,6 +2,7 @@
|
||||
import copy
|
||||
|
||||
from plexapi.exceptions import NotFound
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
|
||||
from homeassistant.components.media_player.const import (
|
||||
@ -17,14 +18,14 @@ from homeassistant.components.plex.const import (
|
||||
CONF_IGNORE_NEW_SHARED_USERS,
|
||||
CONF_IGNORE_PLEX_WEB_CLIENTS,
|
||||
CONF_MONITORED_USERS,
|
||||
CONF_SERVER,
|
||||
DOMAIN,
|
||||
PLEX_UPDATE_PLATFORMS_SIGNAL,
|
||||
SERVERS,
|
||||
)
|
||||
from homeassistant.const import ATTR_ENTITY_ID
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
||||
|
||||
from .const import DEFAULT_DATA, DEFAULT_OPTIONS
|
||||
from .helpers import trigger_plex_update
|
||||
from .mock_classes import (
|
||||
MockPlexAccount,
|
||||
MockPlexArtist,
|
||||
@ -63,7 +64,7 @@ async def test_new_users_available(hass):
|
||||
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
|
||||
@ -102,7 +103,7 @@ async def test_new_ignored_users_available(hass, caplog):
|
||||
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users
|
||||
@ -125,109 +126,77 @@ async def test_new_ignored_users_available(hass, caplog):
|
||||
assert sensor.state == str(len(mock_plex_server.accounts))
|
||||
|
||||
|
||||
# class TestClockedPlex(ClockedTestCase):
|
||||
# """Create clock-controlled tests.async_mock class."""
|
||||
async def test_network_error_during_refresh(hass, caplog):
|
||||
"""Test network failures during refreshes."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=DEFAULT_DATA,
|
||||
options=DEFAULT_OPTIONS,
|
||||
unique_id=DEFAULT_DATA["server_id"],
|
||||
)
|
||||
|
||||
# async def setUp(self):
|
||||
# """Initialize this test class."""
|
||||
# self.hass = await async_test_home_assistant(self.loop)
|
||||
mock_plex_server = MockPlexServer()
|
||||
|
||||
# async def tearDown(self):
|
||||
# """Clean up the HomeAssistant instance."""
|
||||
# await self.hass.async_stop()
|
||||
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount()
|
||||
), patch("homeassistant.components.plex.PlexWebsocket.listen"):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# async def test_mark_sessions_idle(self):
|
||||
# """Test marking media_players as idle when sessions end."""
|
||||
# hass = self.hass
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
loaded_server = hass.data[DOMAIN][SERVERS][server_id]
|
||||
|
||||
# entry = MockConfigEntry(
|
||||
# domain=DOMAIN,
|
||||
# data=DEFAULT_DATA,
|
||||
# options=DEFAULT_OPTIONS,
|
||||
# unique_id=DEFAULT_DATA["server_id"],
|
||||
# )
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# mock_plex_server = MockPlexServer(config_entry=entry)
|
||||
sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
assert sensor.state == str(len(mock_plex_server.accounts))
|
||||
|
||||
# with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
# "homeassistant.components.plex.PlexWebsocket.listen"
|
||||
# ):
|
||||
# entry.add_to_hass(hass)
|
||||
# assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
# await hass.async_block_till_done()
|
||||
with patch.object(mock_plex_server, "clients", side_effect=RequestException):
|
||||
await loaded_server._async_update_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# server_id = mock_plex_server.machineIdentifier
|
||||
assert (
|
||||
f"Could not connect to Plex server: {DEFAULT_DATA[CONF_SERVER]}" in caplog.text
|
||||
)
|
||||
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
|
||||
# sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
# assert sensor.state == str(len(mock_plex_server.accounts))
|
||||
async def test_mark_sessions_idle(hass):
|
||||
"""Test marking media_players as idle when sessions end."""
|
||||
entry = MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data=DEFAULT_DATA,
|
||||
options=DEFAULT_OPTIONS,
|
||||
unique_id=DEFAULT_DATA["server_id"],
|
||||
)
|
||||
|
||||
# mock_plex_server.clear_clients()
|
||||
# mock_plex_server.clear_sessions()
|
||||
mock_plex_server = MockPlexServer()
|
||||
|
||||
# await self.advance(DEBOUNCE_TIMEOUT)
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
"plexapi.myplex.MyPlexAccount", return_value=MockPlexAccount()
|
||||
), patch("homeassistant.components.plex.PlexWebsocket.listen"):
|
||||
entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
# assert sensor.state == "0"
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
loaded_server = hass.data[DOMAIN][SERVERS][server_id]
|
||||
|
||||
# async def test_debouncer(self):
|
||||
# """Test debouncer behavior."""
|
||||
# hass = self.hass
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# entry = MockConfigEntry(
|
||||
# domain=DOMAIN,
|
||||
# data=DEFAULT_DATA,
|
||||
# options=DEFAULT_OPTIONS,
|
||||
# unique_id=DEFAULT_DATA["server_id"],
|
||||
# )
|
||||
sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
assert sensor.state == str(len(mock_plex_server.accounts))
|
||||
|
||||
# mock_plex_server = MockPlexServer(config_entry=entry)
|
||||
mock_plex_server.clear_clients()
|
||||
mock_plex_server.clear_sessions()
|
||||
|
||||
# with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
|
||||
# "homeassistant.components.plex.PlexWebsocket.listen"
|
||||
# ):
|
||||
# entry.add_to_hass(hass)
|
||||
# assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
# await hass.async_block_till_done()
|
||||
await loaded_server._async_update_platforms()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
# with patch.object(mock_plex_server, "clients", return_value=[]), patch.object(
|
||||
# mock_plex_server, "sessions", return_value=[]
|
||||
# ) as mock_update:
|
||||
# # Called immediately
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 1
|
||||
|
||||
# # Throttled
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 1
|
||||
|
||||
# # Throttled
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 1
|
||||
|
||||
# # Called from scheduler
|
||||
# await self.advance(DEBOUNCE_TIMEOUT)
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 2
|
||||
|
||||
# # Throttled
|
||||
# async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 2
|
||||
|
||||
# # Called from scheduler
|
||||
# await self.advance(DEBOUNCE_TIMEOUT)
|
||||
# await hass.async_block_till_done()
|
||||
# assert mock_update.call_count == 3
|
||||
sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
assert sensor.state == "0"
|
||||
|
||||
|
||||
async def test_ignore_plex_web_client(hass):
|
||||
@ -252,9 +221,7 @@ async def test_ignore_plex_web_client(hass):
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
server_id = mock_plex_server.machineIdentifier
|
||||
|
||||
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
sensor = hass.states.get("sensor.plex_plex_server_1")
|
||||
@ -288,7 +255,7 @@ async def test_media_lookups(hass):
|
||||
loaded_server = hass.data[DOMAIN][SERVERS][server_id]
|
||||
|
||||
# Plex Key searches
|
||||
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
|
||||
trigger_plex_update(hass, mock_plex_server)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
media_player_id = hass.states.async_entity_ids("media_player")[0]
|
||||
|
Loading…
x
Reference in New Issue
Block a user