Migrate webostv to new library and make integration async with callback state updates (#29296)

* migrate webostv to new aiopylgtv version of the library and add support
for generic commands, input/button commands, and callback state updates

* update requirements

* cleanup and bump aiopylgtv version

* update webostv unit tests

* make webostv unit tests work with python 3.7

* cleanup for code checks

* cleanup and code review

* make all client request functions coroutines

* make host required for webostv configuration

* remove generic command and button functionality plus related cleanup

* fix previous track function

* update unit tests

* fix imports for unit tests

* update unit test

* further unit test updates

* remove unnecessary setup call in unit tests

* restore previous behaviour with client key config file in hass configuration directory
This commit is contained in:
Josh Bendavid 2019-12-31 18:26:35 -05:00 committed by Martin Hjelmare
parent af153521dc
commit fc23b4f83f
7 changed files with 420 additions and 300 deletions

View File

@ -1 +1,152 @@
"""Support for WebOS TV."""
import asyncio
import logging
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException, WebOsClient
import voluptuous as vol
from websockets.exceptions import ConnectionClosed
from homeassistant.const import (
CONF_CUSTOMIZE,
CONF_HOST,
CONF_ICON,
CONF_NAME,
EVENT_HOMEASSISTANT_STOP,
)
import homeassistant.helpers.config_validation as cv
DOMAIN = "webostv"
CONF_SOURCES = "sources"
CONF_ON_ACTION = "turn_on_action"
CONF_STANDBY_CONNECTION = "standby_connection"
DEFAULT_NAME = "LG webOS Smart TV"
WEBOSTV_CONFIG_FILE = "webostv.conf"
CUSTOMIZE_SCHEMA = vol.Schema(
{vol.Optional(CONF_SOURCES, default=[]): vol.All(cv.ensure_list, [cv.string])}
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
cv.ensure_list,
[
vol.Schema(
{
vol.Optional(CONF_CUSTOMIZE, default={}): CUSTOMIZE_SCHEMA,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_ON_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(
CONF_STANDBY_CONNECTION, default=False
): cv.boolean,
vol.Optional(CONF_ICON): cv.string,
}
)
],
)
},
extra=vol.ALLOW_EXTRA,
)
_LOGGER = logging.getLogger(__name__)
async def async_setup(hass, config):
"""Set up the LG WebOS TV platform."""
hass.data[DOMAIN] = {}
tasks = [async_setup_tv(hass, config, conf) for conf in config[DOMAIN]]
if tasks:
await asyncio.gather(*tasks)
return True
async def async_setup_tv(hass, config, conf):
"""Set up a LG WebOS TV based on host parameter."""
host = conf[CONF_HOST]
config_file = hass.config.path(WEBOSTV_CONFIG_FILE)
standby_connection = conf[CONF_STANDBY_CONNECTION]
client = WebOsClient(host, config_file, standby_connection=standby_connection)
hass.data[DOMAIN][host] = {"client": client}
if client.is_registered():
await async_setup_tv_finalize(hass, config, conf, client)
else:
_LOGGER.warning("LG webOS TV %s needs to be paired", host)
await async_request_configuration(hass, config, conf, client)
async def async_connect(client):
"""Attempt a connection, but fail gracefully if tv is off for example."""
try:
await client.connect()
except (
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVPairException,
PyLGTVCmdException,
):
pass
async def async_setup_tv_finalize(hass, config, conf, client):
"""Make initial connection attempt and call platform setup."""
async def async_on_stop(event):
"""Unregister callbacks and disconnect."""
client.clear_state_update_callbacks()
await client.disconnect()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_on_stop)
await async_connect(client)
hass.async_create_task(
hass.helpers.discovery.async_load_platform("media_player", DOMAIN, conf, config)
)
hass.async_create_task(
hass.helpers.discovery.async_load_platform("notify", DOMAIN, conf, config)
)
async def async_request_configuration(hass, config, conf, client):
"""Request configuration steps from the user."""
host = conf.get(CONF_HOST)
name = conf.get(CONF_NAME)
configurator = hass.components.configurator
async def lgtv_configuration_callback(data):
"""Handle actions when configuration callback is called."""
try:
await client.connect()
except PyLGTVPairException:
_LOGGER.warning("Connected to LG webOS TV %s but not paired", host)
return
except (
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
):
_LOGGER.error("Unable to connect to host %s", host)
return
await async_setup_tv_finalize(hass, config, conf, client)
configurator.async_request_done(request_id)
request_id = configurator.async_request_config(
name,
lgtv_configuration_callback,
description="Click start and accept the pairing request on your TV.",
description_image="/static/images/config_webos.png",
submit_caption="Start pairing request",
)

View File

@ -3,8 +3,7 @@
"name": "Webostv",
"documentation": "https://www.home-assistant.io/integrations/webostv",
"requirements": [
"pylgtv==0.1.9",
"websockets==6.0"
"aiopylgtv==0.2.4"
],
"dependencies": ["configurator"],
"codeowners": []

View File

@ -1,16 +1,14 @@
"""Support for interface with an LG webOS Smart TV."""
import asyncio
from datetime import timedelta
from functools import wraps
import logging
from typing import Dict
from urllib.parse import urlparse
from pylgtv import PyLGTVPairException, WebOsClient
import voluptuous as vol
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException
from websockets.exceptions import ConnectionClosed
from homeassistant import util
from homeassistant.components.media_player import PLATFORM_SCHEMA, MediaPlayerDevice
from homeassistant.components.media_player import MediaPlayerDevice
from homeassistant.components.media_player.const import (
MEDIA_TYPE_CHANNEL,
SUPPORT_NEXT_TRACK,
@ -27,27 +25,21 @@ from homeassistant.components.media_player.const import (
)
from homeassistant.const import (
CONF_CUSTOMIZE,
CONF_FILENAME,
CONF_HOST,
CONF_NAME,
CONF_TIMEOUT,
STATE_OFF,
STATE_PAUSED,
STATE_PLAYING,
)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.script import Script
_CONFIGURING: Dict[str, str] = {}
from . import CONF_ON_ACTION, CONF_SOURCES, DOMAIN
_LOGGER = logging.getLogger(__name__)
CONF_SOURCES = "sources"
CONF_ON_ACTION = "turn_on_action"
DEFAULT_NAME = "LG webOS Smart TV"
LIVETV_APP_ID = "com.webos.app.livetv"
WEBOSTV_CONFIG_FILE = "webostv.conf"
SUPPORT_WEBOSTV = (
SUPPORT_TURN_OFF
@ -65,131 +57,65 @@ SUPPORT_WEBOSTV = (
MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10)
MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(seconds=1)
CUSTOMIZE_SCHEMA = vol.Schema(
{vol.Optional(CONF_SOURCES): vol.All(cv.ensure_list, [cv.string])}
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_CUSTOMIZE, default={}): CUSTOMIZE_SCHEMA,
vol.Optional(CONF_FILENAME, default=WEBOSTV_CONFIG_FILE): cv.string,
vol.Optional(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_ON_ACTION): cv.SCRIPT_SCHEMA,
vol.Optional(CONF_TIMEOUT, default=8): cv.positive_int,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up the LG WebOS TV platform."""
if discovery_info is not None:
host = urlparse(discovery_info[1]).hostname
else:
host = config.get(CONF_HOST)
if host is None:
_LOGGER.error("No TV found in configuration file or with discovery")
return False
# Only act if we are not already configuring this host
if host in _CONFIGURING:
if discovery_info is None:
return
name = config.get(CONF_NAME)
customize = config.get(CONF_CUSTOMIZE)
timeout = config.get(CONF_TIMEOUT)
turn_on_action = config.get(CONF_ON_ACTION)
host = discovery_info[CONF_HOST]
name = discovery_info[CONF_NAME]
customize = discovery_info[CONF_CUSTOMIZE]
turn_on_action = discovery_info.get(CONF_ON_ACTION)
config = hass.config.path(config.get(CONF_FILENAME))
client = hass.data[DOMAIN][host]["client"]
on_script = Script(hass, turn_on_action) if turn_on_action else None
setup_tv(host, name, customize, config, timeout, hass, add_entities, turn_on_action)
entity = LgWebOSMediaPlayerEntity(client, name, customize, on_script)
async_add_entities([entity], update_before_add=False)
def setup_tv(
host, name, customize, config, timeout, hass, add_entities, turn_on_action
):
"""Set up a LG WebOS TV based on host parameter."""
def cmd(func):
"""Catch command exceptions."""
client = WebOsClient(host, config, timeout)
if not client.is_registered():
if host in _CONFIGURING:
# Try to pair.
try:
client.register()
except PyLGTVPairException:
_LOGGER.warning("Connected to LG webOS TV %s but not paired", host)
return
except (OSError, ConnectionClosed, asyncio.TimeoutError):
_LOGGER.error("Unable to connect to host %s", host)
return
else:
# Not registered, request configuration.
_LOGGER.warning("LG webOS TV %s needs to be paired", host)
request_configuration(
host,
name,
customize,
config,
timeout,
hass,
add_entities,
turn_on_action,
@wraps(func)
async def wrapper(obj, *args, **kwargs):
"""Wrap all command methods."""
try:
await func(obj, *args, **kwargs)
except (
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
) as exc:
# If TV is off, we expect calls to fail.
if obj.state == STATE_OFF:
level = logging.INFO
else:
level = logging.ERROR
_LOGGER.log(
level,
"Error calling %s on entity %s: %r",
func.__name__,
obj.entity_id,
exc,
)
return
# If we came here and configuring this host, mark as done.
if client.is_registered() and host in _CONFIGURING:
request_id = _CONFIGURING.pop(host)
configurator = hass.components.configurator
configurator.request_done(request_id)
add_entities(
[LgWebOSDevice(host, name, customize, config, timeout, hass, turn_on_action)],
True,
)
return wrapper
def request_configuration(
host, name, customize, config, timeout, hass, add_entities, turn_on_action
):
"""Request configuration steps from the user."""
configurator = hass.components.configurator
# We got an error if this method is called while we are configuring
if host in _CONFIGURING:
configurator.notify_errors(
_CONFIGURING[host], "Failed to pair, please try again."
)
return
def lgtv_configuration_callback(data):
"""Handle actions when configuration callback is called."""
setup_tv(
host, name, customize, config, timeout, hass, add_entities, turn_on_action
)
_CONFIGURING[host] = configurator.request_config(
name,
lgtv_configuration_callback,
description="Click start and accept the pairing request on your TV.",
description_image="/static/images/config_webos.png",
submit_caption="Start pairing request",
)
class LgWebOSDevice(MediaPlayerDevice):
class LgWebOSMediaPlayerEntity(MediaPlayerDevice):
"""Representation of a LG WebOS TV."""
def __init__(self, host, name, customize, config, timeout, hass, on_action):
def __init__(self, client, name, customize, on_script=None):
"""Initialize the webos device."""
self._client = WebOsClient(host, config, timeout)
self._on_script = Script(hass, on_action) if on_action else None
self._customize = customize
self._client = client
self._name = name
self._customize = customize
self._on_script = on_script
# Assume that the TV is not muted
self._muted = False
# Assume that the TV is in Play mode
@ -200,64 +126,86 @@ class LgWebOSDevice(MediaPlayerDevice):
self._state = None
self._source_list = {}
self._app_list = {}
self._input_list = {}
self._channel = None
self._last_icon = None
@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
def update(self):
"""Retrieve the latest data."""
async def async_added_to_hass(self):
"""Connect and subscribe to state updates."""
await self._client.register_state_update_callback(
self.async_handle_state_update
)
try:
current_input = self._client.get_input()
if current_input is not None:
self._current_source_id = current_input
if self._state in (None, STATE_OFF):
self._state = STATE_PLAYING
else:
self._state = STATE_OFF
self._current_source = None
self._current_source_id = None
self._channel = None
# force state update if needed
if self._state is None:
await self.async_handle_state_update()
if self._state is not STATE_OFF:
self._muted = self._client.get_muted()
self._volume = self._client.get_volume()
self._channel = self._client.get_current_channel()
async def async_will_remove_from_hass(self):
"""Call disconnect on removal."""
self._client.unregister_state_update_callback(self.async_handle_state_update)
self._source_list = {}
self._app_list = {}
conf_sources = self._customize.get(CONF_SOURCES, [])
async def async_handle_state_update(self):
"""Update state from WebOsClient."""
self._current_source_id = self._client.current_appId
self._muted = self._client.muted
self._volume = self._client.volume
self._channel = self._client.current_channel
self._app_list = self._client.apps
self._input_list = self._client.inputs
for app in self._client.get_apps():
self._app_list[app["id"]] = app
if app["id"] == self._current_source_id:
self._current_source = app["title"]
self._source_list[app["title"]] = app
elif (
not conf_sources
or app["id"] in conf_sources
or any(word in app["title"] for word in conf_sources)
or any(word in app["id"] for word in conf_sources)
):
self._source_list[app["title"]] = app
for source in self._client.get_inputs():
if source["id"] == self._current_source_id:
self._current_source = source["label"]
self._source_list[source["label"]] = source
elif (
not conf_sources
or source["label"] in conf_sources
or any(
source["label"].find(word) != -1 for word in conf_sources
)
):
self._source_list[source["label"]] = source
except (OSError, ConnectionClosed, TypeError, asyncio.TimeoutError):
if self._current_source_id == "":
self._state = STATE_OFF
self._current_source = None
self._current_source_id = None
self._channel = None
else:
self._state = STATE_PLAYING
self.update_sources()
self.async_schedule_update_ha_state(False)
def update_sources(self):
"""Update list of sources from current source, apps, inputs and configured list."""
self._source_list = {}
conf_sources = self._customize[CONF_SOURCES]
for app in self._app_list.values():
if app["id"] == self._current_source_id:
self._current_source = app["title"]
self._source_list[app["title"]] = app
elif (
not conf_sources
or app["id"] in conf_sources
or any(word in app["title"] for word in conf_sources)
or any(word in app["id"] for word in conf_sources)
):
self._source_list[app["title"]] = app
for source in self._input_list.values():
if source["appId"] == self._current_source_id:
self._current_source = source["label"]
self._source_list[source["label"]] = source
elif (
not conf_sources
or source["label"] in conf_sources
or any(source["label"].find(word) != -1 for word in conf_sources)
):
self._source_list[source["label"]] = source
@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
async def async_update(self):
"""Connect."""
if not self._client.is_connected():
try:
await self._client.connect()
except (
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVPairException,
PyLGTVCmdException,
):
pass
@property
def name(self):
@ -326,46 +274,54 @@ class LgWebOSDevice(MediaPlayerDevice):
return SUPPORT_WEBOSTV | SUPPORT_TURN_ON
return SUPPORT_WEBOSTV
def turn_off(self):
@cmd
async def async_turn_off(self):
"""Turn off media player."""
await self._client.power_off()
self._state = STATE_OFF
try:
self._client.power_off()
except (OSError, ConnectionClosed, TypeError, asyncio.TimeoutError):
pass
def turn_on(self):
async def async_turn_on(self):
"""Turn on the media player."""
connected = self._client.is_connected()
if self._on_script:
self._on_script.run()
await self._on_script.async_run()
def volume_up(self):
# if connection was already active
# ensure is still alive
if connected:
await self._client.get_current_app()
@cmd
async def async_volume_up(self):
"""Volume up the media player."""
self._client.volume_up()
await self._client.volume_up()
def volume_down(self):
@cmd
async def async_volume_down(self):
"""Volume down media player."""
self._client.volume_down()
await self._client.volume_down()
def set_volume_level(self, volume):
@cmd
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
tv_volume = volume * 100
self._client.set_volume(tv_volume)
await self._client.set_volume(tv_volume)
def mute_volume(self, mute):
@cmd
async def async_mute_volume(self, mute):
"""Send mute command."""
self._muted = mute
self._client.set_mute(mute)
await self._client.set_mute(mute)
def media_play_pause(self):
@cmd
async def async_media_play_pause(self):
"""Simulate play pause media player."""
if self._playing:
self.media_pause()
await self.media_pause()
else:
self.media_play()
await self.media_play()
def select_source(self, source):
@cmd
async def async_select_source(self, source):
"""Select input source."""
source_dict = self._source_list.get(source)
if source_dict is None:
@ -374,12 +330,13 @@ class LgWebOSDevice(MediaPlayerDevice):
self._current_source_id = source_dict["id"]
if source_dict.get("title"):
self._current_source = source_dict["title"]
self._client.launch_app(source_dict["id"])
await self._client.launch_app(source_dict["id"])
elif source_dict.get("label"):
self._current_source = source_dict["label"]
self._client.set_input(source_dict["id"])
await self._client.set_input(source_dict["id"])
def play_media(self, media_type, media_id, **kwargs):
@cmd
async def async_play_media(self, media_type, media_id, **kwargs):
"""Play a piece of media."""
_LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id)
@ -405,40 +362,47 @@ class LgWebOSDevice(MediaPlayerDevice):
"Switching to channel <%s> with perfect match",
perfect_match_channel_id,
)
self._client.set_channel(perfect_match_channel_id)
await self._client.set_channel(perfect_match_channel_id)
elif partial_match_channel_id is not None:
_LOGGER.info(
"Switching to channel <%s> with partial match",
partial_match_channel_id,
)
self._client.set_channel(partial_match_channel_id)
await self._client.set_channel(partial_match_channel_id)
return
def media_play(self):
@cmd
async def async_media_play(self):
"""Send play command."""
self._playing = True
self._state = STATE_PLAYING
self._client.play()
await self._client.play()
def media_pause(self):
@cmd
async def async_media_pause(self):
"""Send media pause command to media player."""
self._playing = False
self._state = STATE_PAUSED
self._client.pause()
await self._client.pause()
def media_next_track(self):
@cmd
async def async_media_stop(self):
"""Send stop command to media player."""
await self._client.stop()
@cmd
async def async_media_next_track(self):
"""Send next track command."""
current_input = self._client.get_input()
if current_input == LIVETV_APP_ID:
self._client.channel_up()
await self._client.channel_up()
else:
self._client.fast_forward()
await self._client.fast_forward()
def media_previous_track(self):
@cmd
async def async_media_previous_track(self):
"""Send the previous track command."""
current_input = self._client.get_input()
if current_input == LIVETV_APP_ID:
self._client.channel_down()
await self._client.channel_down()
else:
self._client.rewind()
await self._client.rewind()

View File

@ -1,47 +1,29 @@
"""Support for LG WebOS TV notification service."""
import asyncio
import logging
from pylgtv import PyLGTVPairException, WebOsClient
import voluptuous as vol
from aiopylgtv import PyLGTVCmdException, PyLGTVPairException
from websockets.exceptions import ConnectionClosed
from homeassistant.components.notify import (
ATTR_DATA,
PLATFORM_SCHEMA,
BaseNotificationService,
)
from homeassistant.const import CONF_FILENAME, CONF_HOST, CONF_ICON
import homeassistant.helpers.config_validation as cv
from homeassistant.components.notify import ATTR_DATA, BaseNotificationService
from homeassistant.const import CONF_HOST, CONF_ICON
from . import DOMAIN
_LOGGER = logging.getLogger(__name__)
WEBOSTV_CONFIG_FILE = "webostv.conf"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_FILENAME, default=WEBOSTV_CONFIG_FILE): cv.string,
vol.Optional(CONF_ICON): cv.string,
}
)
def get_service(hass, config, discovery_info=None):
async def async_get_service(hass, config, discovery_info=None):
"""Return the notify service."""
path = hass.config.path(config.get(CONF_FILENAME))
client = WebOsClient(config.get(CONF_HOST), key_file_path=path, timeout_connect=8)
host = discovery_info.get(CONF_HOST)
icon_path = discovery_info.get(CONF_ICON)
if not client.is_registered():
try:
client.register()
except PyLGTVPairException:
_LOGGER.error("Pairing with TV failed")
return None
except OSError:
_LOGGER.error("TV unreachable")
return None
client = hass.data[DOMAIN][host]["client"]
return LgWebOSNotificationService(client, config.get(CONF_ICON))
svc = LgWebOSNotificationService(client, icon_path)
return svc
class LgWebOSNotificationService(BaseNotificationService):
@ -52,18 +34,27 @@ class LgWebOSNotificationService(BaseNotificationService):
self._client = client
self._icon_path = icon_path
def send_message(self, message="", **kwargs):
async def async_send_message(self, message="", **kwargs):
"""Send a message to the tv."""
try:
if not self._client.is_connected():
await self._client.connect()
data = kwargs.get(ATTR_DATA)
icon_path = (
data.get(CONF_ICON, self._icon_path) if data else self._icon_path
)
self._client.send_message(message, icon_path=icon_path)
await self._client.send_message(message, icon_path=icon_path)
except PyLGTVPairException:
_LOGGER.error("Pairing with TV failed")
except FileNotFoundError:
_LOGGER.error("Icon %s not found", icon_path)
except OSError:
except (
OSError,
ConnectionClosed,
ConnectionRefusedError,
asyncio.TimeoutError,
asyncio.CancelledError,
PyLGTVCmdException,
):
_LOGGER.error("TV unreachable")

View File

@ -186,6 +186,9 @@ aionotion==1.1.0
# homeassistant.components.hunterdouglas_powerview
aiopvapi==1.6.14
# homeassistant.components.webostv
aiopylgtv==0.2.4
# homeassistant.components.switcher_kis
aioswitcher==2019.4.26
@ -1325,9 +1328,6 @@ pylaunches==0.2.0
# homeassistant.components.lg_netcast
pylgnetcast-homeassistant==0.2.0.dev0
# homeassistant.components.webostv
pylgtv==0.1.9
# homeassistant.components.linky
pylinky==0.4.0
@ -2035,9 +2035,6 @@ webexteamssdk==1.1.1
# homeassistant.components.gpmdp
websocket-client==0.54.0
# homeassistant.components.webostv
websockets==6.0
# homeassistant.components.wirelesstag
wirelesstagpy==0.4.0

View File

@ -68,6 +68,9 @@ aiohue==1.10.1
# homeassistant.components.notion
aionotion==1.1.0
# homeassistant.components.webostv
aiopylgtv==0.2.4
# homeassistant.components.switcher_kis
aioswitcher==2019.4.26
@ -442,9 +445,6 @@ pyiqvia==0.2.1
# homeassistant.components.kira
pykira==0.1.1
# homeassistant.components.webostv
pylgtv==0.1.9
# homeassistant.components.linky
pylinky==0.4.0
@ -636,9 +636,6 @@ wakeonlan==1.1.6
# homeassistant.components.folder_watcher
watchdog==0.8.3
# homeassistant.components.webostv
websockets==6.0
# homeassistant.components.withings
withings-api==2.1.3

View File

@ -1,56 +1,77 @@
"""The tests for the LG webOS media player platform."""
import unittest
from unittest import mock
import sys
from homeassistant.components.webostv import media_player as webostv
import pytest
from homeassistant.components import media_player
from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_MEDIA_VOLUME_MUTED,
SERVICE_SELECT_SOURCE,
)
from homeassistant.components.webostv import DOMAIN
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_HOST,
CONF_NAME,
SERVICE_VOLUME_MUTE,
)
from homeassistant.setup import async_setup_component
if sys.version_info >= (3, 8, 0):
from unittest.mock import patch
else:
from asynctest import patch
class FakeLgWebOSDevice(webostv.LgWebOSDevice):
"""A fake device without the client setup required for the real one."""
def __init__(self, *args, **kwargs):
"""Initialise parameters needed for tests with fake values."""
self._source_list = {}
self._client = mock.MagicMock()
self._name = "fake_device"
self._current_source = None
NAME = "fake"
ENTITY_ID = f"{media_player.DOMAIN}.{NAME}"
class TestLgWebOSDevice(unittest.TestCase):
"""Test the LgWebOSDevice class."""
@pytest.fixture(name="client")
def client_fixture():
"""Patch of client library for tests."""
with patch(
"homeassistant.components.webostv.WebOsClient", autospec=True
) as mock_client_class:
yield mock_client_class.return_value
def setUp(self):
"""Configure a fake device for each test."""
self.device = FakeLgWebOSDevice()
def test_select_source_with_empty_source_list(self):
"""Ensure we don't call client methods when we don't have sources."""
self.device.select_source("nonexistent")
assert 0 == self.device._client.launch_app.call_count
assert 0 == self.device._client.set_input.call_count
async def setup_webostv(hass):
"""Initialize webostv and media_player for tests."""
assert await async_setup_component(
hass, DOMAIN, {DOMAIN: {CONF_HOST: "fake", CONF_NAME: NAME}},
)
await hass.async_block_till_done()
def test_select_source_with_titled_entry(self):
"""Test that a titled source is treated as an app."""
self.device._source_list = {
"existent": {"id": "existent_id", "title": "existent_title"}
}
self.device.select_source("existent")
async def test_mute(hass, client):
"""Test simple service call."""
assert "existent_title" == self.device._current_source
assert [mock.call("existent_id")] == (
self.device._client.launch_app.call_args_list
)
await setup_webostv(hass)
def test_select_source_with_labelled_entry(self):
"""Test that a labelled source is treated as an input source."""
self.device._source_list = {
"existent": {"id": "existent_id", "label": "existent_label"}
}
data = {
ATTR_ENTITY_ID: ENTITY_ID,
ATTR_MEDIA_VOLUME_MUTED: True,
}
await hass.services.async_call(media_player.DOMAIN, SERVICE_VOLUME_MUTE, data)
await hass.async_block_till_done()
self.device.select_source("existent")
client.set_mute.assert_called_once()
assert "existent_label" == self.device._current_source
assert [mock.call("existent_id")] == (
self.device._client.set_input.call_args_list
)
async def test_select_source_with_empty_source_list(hass, client):
"""Ensure we don't call client methods when we don't have sources."""
await setup_webostv(hass)
data = {
ATTR_ENTITY_ID: ENTITY_ID,
ATTR_INPUT_SOURCE: "nonexistent",
}
await hass.services.async_call(media_player.DOMAIN, SERVICE_SELECT_SOURCE, data)
await hass.async_block_till_done()
assert hass.states.is_state(ENTITY_ID, "playing")
client.launch_app.assert_not_called()
client.set_input.assert_not_called()