Add support for v6 features to philips js integration (#46422)

This commit is contained in:
Joakim Plate 2021-02-26 18:34:40 +01:00 committed by GitHub
parent 7ab2d91bf0
commit e12eba1989
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 702 additions and 138 deletions

View File

@ -8,8 +8,13 @@ from haphilipsjs import ConnectionFailure, PhilipsTV
from homeassistant.components.automation import AutomationActionType
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_API_VERSION, CONF_HOST
from homeassistant.core import Context, HassJob, HomeAssistant, callback
from homeassistant.const import (
CONF_API_VERSION,
CONF_HOST,
CONF_PASSWORD,
CONF_USERNAME,
)
from homeassistant.core import CALLBACK_TYPE, Context, HassJob, HomeAssistant, callback
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
@ -30,7 +35,12 @@ async def async_setup(hass: HomeAssistant, config: dict):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry):
"""Set up Philips TV from a config entry."""
tvapi = PhilipsTV(entry.data[CONF_HOST], entry.data[CONF_API_VERSION])
tvapi = PhilipsTV(
entry.data[CONF_HOST],
entry.data[CONF_API_VERSION],
username=entry.data.get(CONF_USERNAME),
password=entry.data.get(CONF_PASSWORD),
)
coordinator = PhilipsTVDataUpdateCoordinator(hass, tvapi)
@ -103,7 +113,9 @@ class PhilipsTVDataUpdateCoordinator(DataUpdateCoordinator[None]):
def __init__(self, hass, api: PhilipsTV) -> None:
"""Set up the coordinator."""
self.api = api
self._notify_future: Optional[asyncio.Task] = None
@callback
def _update_listeners():
for update_callback in self._listeners:
update_callback()
@ -120,9 +132,43 @@ class PhilipsTVDataUpdateCoordinator(DataUpdateCoordinator[None]):
),
)
async def _notify_task(self):
while self.api.on and self.api.notify_change_supported:
if await self.api.notifyChange(130):
self.async_set_updated_data(None)
@callback
def _async_notify_stop(self):
if self._notify_future:
self._notify_future.cancel()
self._notify_future = None
@callback
def _async_notify_schedule(self):
if (
(self._notify_future is None or self._notify_future.done())
and self.api.on
and self.api.notify_change_supported
):
self._notify_future = self.hass.loop.create_task(self._notify_task())
@callback
def async_remove_listener(self, update_callback: CALLBACK_TYPE) -> None:
"""Remove data update."""
super().async_remove_listener(update_callback)
if not self._listeners:
self._async_notify_stop()
@callback
def _async_stop_refresh(self, event: asyncio.Event) -> None:
super()._async_stop_refresh(event)
self._async_notify_stop()
@callback
async def _async_update_data(self):
"""Fetch the latest data from the source."""
try:
await self.hass.async_add_executor_job(self.api.update)
await self.api.update()
self._async_notify_schedule()
except ConnectionFailure:
pass

View File

@ -1,35 +1,47 @@
"""Config flow for Philips TV integration."""
import logging
from typing import Any, Dict, Optional, TypedDict
import platform
from typing import Any, Dict, Optional, Tuple, TypedDict
from haphilipsjs import ConnectionFailure, PhilipsTV
from haphilipsjs import ConnectionFailure, PairingFailure, PhilipsTV
import voluptuous as vol
from homeassistant import config_entries, core
from homeassistant.const import CONF_API_VERSION, CONF_HOST
from homeassistant.const import (
CONF_API_VERSION,
CONF_HOST,
CONF_PASSWORD,
CONF_PIN,
CONF_USERNAME,
)
from .const import DOMAIN # pylint:disable=unused-import
_LOGGER = logging.getLogger(__name__)
from . import LOGGER
from .const import ( # pylint:disable=unused-import
CONF_SYSTEM,
CONST_APP_ID,
CONST_APP_NAME,
DOMAIN,
)
class FlowUserDict(TypedDict):
"""Data for user step."""
host: str
api_version: int
async def validate_input(hass: core.HomeAssistant, data: FlowUserDict):
async def validate_input(
hass: core.HomeAssistant, host: str, api_version: int
) -> Tuple[Dict, PhilipsTV]:
"""Validate the user input allows us to connect."""
hub = PhilipsTV(data[CONF_HOST], data[CONF_API_VERSION])
hub = PhilipsTV(host, api_version)
await hass.async_add_executor_job(hub.getSystem)
await hub.getSystem()
await hub.setTransport(hub.secured_transport)
if hub.system is None:
raise ConnectionFailure
if not hub.system:
raise ConnectionFailure("System data is empty")
return hub.system
return hub
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
@ -38,7 +50,9 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_POLL
_default = {}
_current = {}
_hub: PhilipsTV
_pair_state: Any
async def async_step_import(self, conf: Dict[str, Any]):
"""Import a configuration from config.yaml."""
@ -53,34 +67,99 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
}
)
async def _async_create_current(self):
system = self._current[CONF_SYSTEM]
return self.async_create_entry(
title=f"{system['name']} ({system['serialnumber']})",
data=self._current,
)
async def async_step_pair(self, user_input: Optional[Dict] = None):
"""Attempt to pair with device."""
assert self._hub
errors = {}
schema = vol.Schema(
{
vol.Required(CONF_PIN): str,
}
)
if not user_input:
try:
self._pair_state = await self._hub.pairRequest(
CONST_APP_ID,
CONST_APP_NAME,
platform.node(),
platform.system(),
"native",
)
except PairingFailure as exc:
LOGGER.debug(str(exc))
return self.async_abort(
reason="pairing_failure",
description_placeholders={"error_id": exc.data.get("error_id")},
)
return self.async_show_form(
step_id="pair", data_schema=schema, errors=errors
)
try:
username, password = await self._hub.pairGrant(
self._pair_state, user_input[CONF_PIN]
)
except PairingFailure as exc:
LOGGER.debug(str(exc))
if exc.data.get("error_id") == "INVALID_PIN":
errors[CONF_PIN] = "invalid_pin"
return self.async_show_form(
step_id="pair", data_schema=schema, errors=errors
)
return self.async_abort(
reason="pairing_failure",
description_placeholders={"error_id": exc.data.get("error_id")},
)
self._current[CONF_USERNAME] = username
self._current[CONF_PASSWORD] = password
return await self._async_create_current()
async def async_step_user(self, user_input: Optional[FlowUserDict] = None):
"""Handle the initial step."""
errors = {}
if user_input:
self._default = user_input
self._current = user_input
try:
system = await validate_input(self.hass, user_input)
except ConnectionFailure:
hub = await validate_input(
self.hass, user_input[CONF_HOST], user_input[CONF_API_VERSION]
)
except ConnectionFailure as exc:
LOGGER.error(str(exc))
errors["base"] = "cannot_connect"
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Unexpected exception")
LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
await self.async_set_unique_id(system["serialnumber"])
self._abort_if_unique_id_configured(updates=user_input)
data = {**user_input, "system": system}
await self.async_set_unique_id(hub.system["serialnumber"])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=f"{system['name']} ({system['serialnumber']})", data=data
)
self._current[CONF_SYSTEM] = hub.system
self._current[CONF_API_VERSION] = hub.api_version
self._hub = hub
if hub.pairing_type == "digest_auth_pairing":
return await self.async_step_pair()
return await self._async_create_current()
schema = vol.Schema(
{
vol.Required(CONF_HOST, default=self._default.get(CONF_HOST)): str,
vol.Required(CONF_HOST, default=self._current.get(CONF_HOST)): str,
vol.Required(
CONF_API_VERSION, default=self._default.get(CONF_API_VERSION)
): vol.In([1, 6]),
CONF_API_VERSION, default=self._current.get(CONF_API_VERSION, 1)
): vol.In([1, 5, 6]),
}
)
return self.async_show_form(step_id="user", data_schema=schema, errors=errors)

View File

@ -2,3 +2,6 @@
DOMAIN = "philips_js"
CONF_SYSTEM = "system"
CONST_APP_ID = "homeassistant.io"
CONST_APP_NAME = "Home Assistant"

View File

@ -3,7 +3,7 @@
"name": "Philips TV",
"documentation": "https://www.home-assistant.io/integrations/philips_js",
"requirements": [
"ha-philipsjs==0.1.0"
"ha-philipsjs==2.3.0"
],
"codeowners": [
"@elupus"

View File

@ -1,6 +1,7 @@
"""Media Player component to integrate TVs exposing the Joint Space API."""
from typing import Any, Dict
from typing import Any, Dict, Optional
from haphilipsjs import ConnectionFailure
import voluptuous as vol
from homeassistant import config_entries
@ -11,15 +12,21 @@ from homeassistant.components.media_player import (
MediaPlayerEntity,
)
from homeassistant.components.media_player.const import (
MEDIA_CLASS_APP,
MEDIA_CLASS_CHANNEL,
MEDIA_CLASS_DIRECTORY,
MEDIA_TYPE_APP,
MEDIA_TYPE_APPS,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_CHANNELS,
SUPPORT_BROWSE_MEDIA,
SUPPORT_NEXT_TRACK,
SUPPORT_PAUSE,
SUPPORT_PLAY,
SUPPORT_PLAY_MEDIA,
SUPPORT_PREVIOUS_TRACK,
SUPPORT_SELECT_SOURCE,
SUPPORT_STOP,
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
@ -27,7 +34,6 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_STEP,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.philips_js import PhilipsTVDataUpdateCoordinator
from homeassistant.const import (
CONF_API_VERSION,
CONF_HOST,
@ -40,7 +46,7 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import LOGGER as _LOGGER
from . import LOGGER as _LOGGER, PhilipsTVDataUpdateCoordinator
from .const import CONF_SYSTEM, DOMAIN
SUPPORT_PHILIPS_JS = (
@ -53,16 +59,15 @@ SUPPORT_PHILIPS_JS = (
| SUPPORT_PREVIOUS_TRACK
| SUPPORT_PLAY_MEDIA
| SUPPORT_BROWSE_MEDIA
| SUPPORT_PLAY
| SUPPORT_PAUSE
| SUPPORT_STOP
)
CONF_ON_ACTION = "turn_on_action"
DEFAULT_API_VERSION = 1
PREFIX_SEPARATOR = ": "
PREFIX_SOURCE = "Input"
PREFIX_CHANNEL = "Channel"
PLATFORM_SCHEMA = vol.All(
cv.deprecated(CONF_HOST),
cv.deprecated(CONF_NAME),
@ -131,12 +136,19 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
self._supports = SUPPORT_PHILIPS_JS
self._system = system
self._unique_id = unique_id
self._state = STATE_OFF
self._media_content_type: Optional[str] = None
self._media_content_id: Optional[str] = None
self._media_title: Optional[str] = None
self._media_channel: Optional[str] = None
super().__init__(coordinator)
self._update_from_coordinator()
def _update_soon(self):
async def _async_update_soon(self):
"""Reschedule update task."""
self.hass.add_job(self.coordinator.async_request_refresh)
self.async_write_ha_state()
await self.coordinator.async_request_refresh()
@property
def name(self):
@ -147,7 +159,9 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
def supported_features(self):
"""Flag media player features that are supported."""
supports = self._supports
if self._coordinator.turn_on:
if self._coordinator.turn_on or (
self._tv.on and self._tv.powerstate is not None
):
supports |= SUPPORT_TURN_ON
return supports
@ -155,6 +169,7 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
def state(self):
"""Get the device state. An exception means OFF state."""
if self._tv.on:
if self._tv.powerstate == "On" or self._tv.powerstate is None:
return STATE_ON
return STATE_OFF
@ -168,22 +183,12 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
"""List of available input sources."""
return list(self._sources.values())
def select_source(self, source):
async def async_select_source(self, source):
"""Set the input source."""
data = source.split(PREFIX_SEPARATOR, 1)
if data[0] == PREFIX_SOURCE: # Legacy way to set source
source_id = _inverted(self._sources).get(data[1])
if source_id:
self._tv.setSource(source_id)
elif data[0] == PREFIX_CHANNEL: # Legacy way to set channel
channel_id = _inverted(self._channels).get(data[1])
if channel_id:
self._tv.setChannel(channel_id)
else:
source_id = _inverted(self._sources).get(source)
if source_id:
self._tv.setSource(source_id)
self._update_soon()
await self._tv.setSource(source_id)
await self._async_update_soon()
@property
def volume_level(self):
@ -197,78 +202,118 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
async def async_turn_on(self):
"""Turn on the device."""
if self._tv.on and self._tv.powerstate:
await self._tv.setPowerState("On")
self._state = STATE_ON
else:
await self._coordinator.turn_on.async_run(self.hass, self._context)
await self._async_update_soon()
def turn_off(self):
async def async_turn_off(self):
"""Turn off the device."""
self._tv.sendKey("Standby")
self._tv.on = False
self._update_soon()
await self._tv.sendKey("Standby")
self._state = STATE_OFF
await self._async_update_soon()
def volume_up(self):
async def async_volume_up(self):
"""Send volume up command."""
self._tv.sendKey("VolumeUp")
self._update_soon()
await self._tv.sendKey("VolumeUp")
await self._async_update_soon()
def volume_down(self):
async def async_volume_down(self):
"""Send volume down command."""
self._tv.sendKey("VolumeDown")
self._update_soon()
await self._tv.sendKey("VolumeDown")
await self._async_update_soon()
def mute_volume(self, mute):
async def async_mute_volume(self, mute):
"""Send mute command."""
self._tv.setVolume(None, mute)
self._update_soon()
if self._tv.muted != mute:
await self._tv.sendKey("Mute")
await self._async_update_soon()
else:
_LOGGER.debug("Ignoring request when already in expected state")
def set_volume_level(self, volume):
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
self._tv.setVolume(volume, self._tv.muted)
self._update_soon()
await self._tv.setVolume(volume, self._tv.muted)
await self._async_update_soon()
def media_previous_track(self):
async def async_media_previous_track(self):
"""Send rewind command."""
self._tv.sendKey("Previous")
self._update_soon()
await self._tv.sendKey("Previous")
await self._async_update_soon()
def media_next_track(self):
async def async_media_next_track(self):
"""Send fast forward command."""
self._tv.sendKey("Next")
self._update_soon()
await self._tv.sendKey("Next")
await self._async_update_soon()
async def async_media_play_pause(self):
"""Send pause command to media player."""
if self._tv.quirk_playpause_spacebar:
await self._tv.sendUnicode(" ")
else:
await self._tv.sendKey("PlayPause")
await self._async_update_soon()
async def async_media_play(self):
"""Send pause command to media player."""
await self._tv.sendKey("Play")
await self._async_update_soon()
async def async_media_pause(self):
"""Send play command to media player."""
await self._tv.sendKey("Pause")
await self._async_update_soon()
async def async_media_stop(self):
"""Send play command to media player."""
await self._tv.sendKey("Stop")
await self._async_update_soon()
@property
def media_channel(self):
"""Get current channel if it's a channel."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return None
return self._media_channel
@property
def media_title(self):
"""Title of current playing media."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return self._sources.get(self._tv.source_id)
return self._media_title
@property
def media_content_type(self):
"""Return content type of playing media."""
if self._tv.source_id == "tv" or self._tv.source_id == "11":
return MEDIA_TYPE_CHANNEL
if self._tv.source_id is None and self._tv.channels:
return MEDIA_TYPE_CHANNEL
return None
return self._media_content_type
@property
def media_content_id(self):
"""Content type of current playing media."""
if self.media_content_type == MEDIA_TYPE_CHANNEL:
return self._channels.get(self._tv.channel_id)
return self._media_content_id
@property
def media_image_url(self):
"""Image url of current playing media."""
if self._media_content_id and self._media_content_type in (
MEDIA_CLASS_APP,
MEDIA_CLASS_CHANNEL,
):
return self.get_browse_image_url(
self._media_content_type, self._media_content_id, media_image_id=None
)
return None
@property
def device_state_attributes(self):
"""Return the state attributes."""
return {"channel_list": list(self._channels.values())}
def app_id(self):
"""ID of the current running app."""
return self._tv.application_id
@property
def app_name(self):
"""Name of the current running app."""
app = self._tv.applications.get(self._tv.application_id)
if app:
return app.get("label")
@property
def device_class(self):
@ -293,57 +338,243 @@ class PhilipsTVMediaPlayer(CoordinatorEntity, MediaPlayerEntity):
"sw_version": self._system.get("softwareversion"),
}
def play_media(self, media_type, media_id, **kwargs):
async def async_play_media(self, media_type, media_id, **kwargs):
"""Play a piece of media."""
_LOGGER.debug("Call play media type <%s>, Id <%s>", media_type, media_id)
if media_type == MEDIA_TYPE_CHANNEL:
channel_id = _inverted(self._channels).get(media_id)
list_id, _, channel_id = media_id.partition("/")
if channel_id:
self._tv.setChannel(channel_id)
self._update_soon()
await self._tv.setChannel(channel_id, list_id)
await self._async_update_soon()
else:
_LOGGER.error("Unable to find channel <%s>", media_id)
elif media_type == MEDIA_TYPE_APP:
app = self._tv.applications.get(media_id)
if app:
await self._tv.setApplication(app["intent"])
await self._async_update_soon()
else:
_LOGGER.error("Unable to find application <%s>", media_id)
else:
_LOGGER.error("Unsupported media type <%s>", media_type)
async def async_browse_media(self, media_content_type=None, media_content_id=None):
"""Implement the websocket media browsing helper."""
if media_content_id not in (None, ""):
raise BrowseError(
f"Media not found: {media_content_type} / {media_content_id}"
async def async_browse_media_channels(self, expanded):
"""Return channel media objects."""
if expanded:
children = [
BrowseMedia(
title=channel.get("name", f"Channel: {channel_id}"),
media_class=MEDIA_CLASS_CHANNEL,
media_content_id=f"alltv/{channel_id}",
media_content_type=MEDIA_TYPE_CHANNEL,
can_play=True,
can_expand=False,
thumbnail=self.get_browse_image_url(
MEDIA_TYPE_APP, channel_id, media_image_id=None
),
)
for channel_id, channel in self._tv.channels.items()
]
else:
children = None
return BrowseMedia(
title="Channels",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="",
media_content_id="channels",
media_content_type=MEDIA_TYPE_CHANNELS,
children_media_class=MEDIA_TYPE_CHANNEL,
can_play=False,
can_expand=True,
children=children,
)
async def async_browse_media_favorites(self, list_id, expanded):
"""Return channel media objects."""
if expanded:
favorites = await self._tv.getFavoriteList(list_id)
if favorites:
def get_name(channel):
channel_data = self._tv.channels.get(str(channel["ccid"]))
if channel_data:
return channel_data["name"]
return f"Channel: {channel['ccid']}"
children = [
BrowseMedia(
title=channel,
title=get_name(channel),
media_class=MEDIA_CLASS_CHANNEL,
media_content_id=channel,
media_content_id=f"{list_id}/{channel['ccid']}",
media_content_type=MEDIA_TYPE_CHANNEL,
can_play=True,
can_expand=False,
thumbnail=self.get_browse_image_url(
MEDIA_TYPE_APP, channel, media_image_id=None
),
)
for channel in self._channels.values()
for channel in favorites
]
else:
children = None
else:
children = None
favorite = self._tv.favorite_lists[list_id]
return BrowseMedia(
title=favorite.get("name", f"Favorites {list_id}"),
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id=f"favorites/{list_id}",
media_content_type=MEDIA_TYPE_CHANNELS,
children_media_class=MEDIA_TYPE_CHANNEL,
can_play=False,
can_expand=True,
children=children,
)
async def async_browse_media_applications(self, expanded):
"""Return application media objects."""
if expanded:
children = [
BrowseMedia(
title=application["label"],
media_class=MEDIA_CLASS_APP,
media_content_id=application_id,
media_content_type=MEDIA_TYPE_APP,
can_play=True,
can_expand=False,
thumbnail=self.get_browse_image_url(
MEDIA_TYPE_APP, application_id, media_image_id=None
),
)
for application_id, application in self._tv.applications.items()
]
else:
children = None
return BrowseMedia(
title="Applications",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="applications",
media_content_type=MEDIA_TYPE_APPS,
children_media_class=MEDIA_TYPE_APP,
can_play=False,
can_expand=True,
children=children,
)
async def async_browse_media_favorite_lists(self, expanded):
"""Return favorite media objects."""
if self._tv.favorite_lists and expanded:
children = [
await self.async_browse_media_favorites(list_id, False)
for list_id in self._tv.favorite_lists
]
else:
children = None
return BrowseMedia(
title="Favorites",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="favorite_lists",
media_content_type=MEDIA_TYPE_CHANNELS,
children_media_class=MEDIA_TYPE_CHANNEL,
can_play=False,
can_expand=True,
children=children,
)
async def async_browse_media_root(self):
"""Return root media objects."""
return BrowseMedia(
title="Library",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_id="",
media_content_type="",
can_play=False,
can_expand=True,
children=[
await self.async_browse_media_channels(False),
await self.async_browse_media_applications(False),
await self.async_browse_media_favorite_lists(False),
],
)
async def async_browse_media(self, media_content_type=None, media_content_id=None):
"""Implement the websocket media browsing helper."""
if not self._tv.on:
raise BrowseError("Can't browse when tv is turned off")
if media_content_id in (None, ""):
return await self.async_browse_media_root()
path = media_content_id.partition("/")
if path[0] == "channels":
return await self.async_browse_media_channels(True)
if path[0] == "applications":
return await self.async_browse_media_applications(True)
if path[0] == "favorite_lists":
return await self.async_browse_media_favorite_lists(True)
if path[0] == "favorites":
return await self.async_browse_media_favorites(path[2], True)
raise BrowseError(f"Media not found: {media_content_type} / {media_content_id}")
async def async_get_browse_image(
self, media_content_type, media_content_id, media_image_id=None
):
"""Serve album art. Returns (content, content_type)."""
try:
if media_content_type == MEDIA_TYPE_APP and media_content_id:
return await self._tv.getApplicationIcon(media_content_id)
if media_content_type == MEDIA_TYPE_CHANNEL and media_content_id:
return await self._tv.getChannelLogo(media_content_id)
except ConnectionFailure:
_LOGGER.warning("Failed to fetch image")
return None, None
async def async_get_media_image(self):
"""Serve album art. Returns (content, content_type)."""
return await self.async_get_browse_image(
self.media_content_type, self.media_content_id, None
)
@callback
def _update_from_coordinator(self):
if self._tv.on:
if self._tv.powerstate in ("Standby", "StandbyKeep"):
self._state = STATE_OFF
else:
self._state = STATE_ON
else:
self._state = STATE_OFF
self._sources = {
srcid: source.get("name") or f"Source {srcid}"
for srcid, source in (self._tv.sources or {}).items()
}
self._channels = {
chid: channel.get("name") or f"Channel {chid}"
for chid, channel in (self._tv.channels or {}).items()
}
if self._tv.channel_active:
self._media_content_type = MEDIA_TYPE_CHANNEL
self._media_content_id = f"all/{self._tv.channel_id}"
self._media_title = self._tv.channels.get(self._tv.channel_id, {}).get(
"name"
)
self._media_channel = self._media_title
elif self._tv.application_id:
self._media_content_type = MEDIA_TYPE_APP
self._media_content_id = self._tv.application_id
self._media_title = self._tv.applications.get(
self._tv.application_id, {}
).get("label")
self._media_channel = None
else:
self._media_content_type = None
self._media_content_id = None
self._media_title = self._sources.get(self._tv.source_id)
self._media_channel = None
@callback
def _handle_coordinator_update(self) -> None:

View File

@ -10,7 +10,9 @@
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
"unknown": "[%key:common::config_flow::error::unknown%]",
"pairing_failure": "Unable to pair: {error_id}",
"invalid_pin": "Invalid PIN"
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"

View File

@ -5,7 +5,9 @@
},
"error": {
"cannot_connect": "Failed to connect",
"unknown": "Unexpected error"
"unknown": "Unexpected error",
"pairing_failure": "Unable to pair: {error_id}",
"invalid_pin": "Invalid PIN"
},
"step": {
"user": {

View File

@ -721,7 +721,7 @@ guppy3==3.1.0
ha-ffmpeg==3.0.2
# homeassistant.components.philips_js
ha-philipsjs==0.1.0
ha-philipsjs==2.3.0
# homeassistant.components.habitica
habitipy==0.2.0

View File

@ -382,7 +382,7 @@ guppy3==3.1.0
ha-ffmpeg==3.0.2
# homeassistant.components.philips_js
ha-philipsjs==0.1.0
ha-philipsjs==2.3.0
# homeassistant.components.habitica
habitipy==0.2.0

View File

@ -3,6 +3,9 @@
MOCK_SERIAL_NO = "1234567890"
MOCK_NAME = "Philips TV"
MOCK_USERNAME = "mock_user"
MOCK_PASSWORD = "mock_password"
MOCK_SYSTEM = {
"menulanguage": "English",
"name": MOCK_NAME,
@ -12,14 +15,63 @@ MOCK_SYSTEM = {
"model": "modelname",
}
MOCK_USERINPUT = {
"host": "1.1.1.1",
"api_version": 1,
MOCK_SYSTEM_UNPAIRED = {
"menulanguage": "Dutch",
"name": "55PUS7181/12",
"country": "Netherlands",
"serialnumber": "ABCDEFGHIJKLF",
"softwareversion": "TPM191E_R.101.001.208.001",
"model": "65OLED855/12",
"deviceid": "1234567890",
"nettvversion": "6.0.2",
"epgsource": "one",
"api_version": {"Major": 6, "Minor": 2, "Patch": 0},
"featuring": {
"jsonfeatures": {
"editfavorites": ["TVChannels", "SatChannels"],
"recordings": ["List", "Schedule", "Manage"],
"ambilight": ["LoungeLight", "Hue", "Ambilight"],
"menuitems": ["Setup_Menu"],
"textentry": [
"context_based",
"initial_string_available",
"editor_info_available",
],
"applications": ["TV_Apps", "TV_Games", "TV_Settings"],
"pointer": ["not_available"],
"inputkey": ["key"],
"activities": ["intent"],
"channels": ["preset_string"],
"mappings": ["server_mapping"],
},
"systemfeatures": {
"tvtype": "consumer",
"content": ["dmr", "dms_tad"],
"tvsearch": "intent",
"pairing_type": "digest_auth_pairing",
"secured_transport": "True",
},
},
}
MOCK_USERINPUT = {
"host": "1.1.1.1",
}
MOCK_IMPORT = {"host": "1.1.1.1", "api_version": 6}
MOCK_CONFIG = {
**MOCK_USERINPUT,
"host": "1.1.1.1",
"api_version": 1,
"system": MOCK_SYSTEM,
}
MOCK_CONFIG_PAIRED = {
"host": "1.1.1.1",
"api_version": 6,
"username": MOCK_USERNAME,
"password": MOCK_PASSWORD,
"system": MOCK_SYSTEM_UNPAIRED,
}
MOCK_ENTITY_ID = "media_player.philips_tv"

View File

@ -1,6 +1,7 @@
"""Standard setup for tests."""
from unittest.mock import Mock, patch
from unittest.mock import create_autospec, patch
from haphilipsjs import PhilipsTV
from pytest import fixture
from homeassistant import setup
@ -20,10 +21,18 @@ async def setup_notification(hass):
@fixture(autouse=True)
def mock_tv():
"""Disable component actual use."""
tv = Mock(autospec="philips_js.PhilipsTV")
tv = create_autospec(PhilipsTV)
tv.sources = {}
tv.channels = {}
tv.application = None
tv.applications = {}
tv.system = MOCK_SYSTEM
tv.api_version = 1
tv.api_version_detected = None
tv.on = True
tv.notify_change_supported = False
tv.pairing_type = None
tv.powerstate = None
with patch(
"homeassistant.components.philips_js.config_flow.PhilipsTV", return_value=tv

View File

@ -1,12 +1,21 @@
"""Test the Philips TV config flow."""
from unittest.mock import patch
from unittest.mock import ANY, patch
from haphilipsjs import PairingFailure
from pytest import fixture
from homeassistant import config_entries
from homeassistant.components.philips_js.const import DOMAIN
from . import MOCK_CONFIG, MOCK_USERINPUT
from . import (
MOCK_CONFIG,
MOCK_CONFIG_PAIRED,
MOCK_IMPORT,
MOCK_PASSWORD,
MOCK_SYSTEM_UNPAIRED,
MOCK_USERINPUT,
MOCK_USERNAME,
)
@fixture(autouse=True)
@ -27,12 +36,26 @@ def mock_setup_entry():
yield mock_setup_entry
@fixture
async def mock_tv_pairable(mock_tv):
"""Return a mock tv that is pariable."""
mock_tv.system = MOCK_SYSTEM_UNPAIRED
mock_tv.pairing_type = "digest_auth_pairing"
mock_tv.api_version = 6
mock_tv.api_version_detected = 6
mock_tv.secured_transport = True
mock_tv.pairRequest.return_value = {}
mock_tv.pairGrant.return_value = MOCK_USERNAME, MOCK_PASSWORD
return mock_tv
async def test_import(hass, mock_setup, mock_setup_entry):
"""Test we get an item on import."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_USERINPUT,
data=MOCK_IMPORT,
)
assert result["type"] == "create_entry"
@ -47,7 +70,7 @@ async def test_import_exist(hass, mock_config_entry):
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data=MOCK_USERINPUT,
data=MOCK_IMPORT,
)
assert result["type"] == "abort"
@ -103,3 +126,116 @@ async def test_form_unexpected_error(hass, mock_tv):
assert result["type"] == "form"
assert result["errors"] == {"base": "unknown"}
async def test_pairing(hass, mock_tv_pairable, mock_setup, mock_setup_entry):
"""Test we get the form."""
mock_tv = mock_tv_pairable
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
MOCK_USERINPUT,
)
assert result["type"] == "form"
assert result["errors"] == {}
mock_tv.setTransport.assert_called_with(True)
mock_tv.pairRequest.assert_called()
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"pin": "1234"}
)
assert result == {
"flow_id": ANY,
"type": "create_entry",
"description": None,
"description_placeholders": None,
"handler": "philips_js",
"result": ANY,
"title": "55PUS7181/12 (ABCDEFGHIJKLF)",
"data": MOCK_CONFIG_PAIRED,
"version": 1,
}
await hass.async_block_till_done()
assert len(mock_setup.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
async def test_pair_request_failed(
hass, mock_tv_pairable, mock_setup, mock_setup_entry
):
"""Test we get the form."""
mock_tv = mock_tv_pairable
mock_tv.pairRequest.side_effect = PairingFailure({})
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
MOCK_USERINPUT,
)
assert result == {
"flow_id": ANY,
"description_placeholders": {"error_id": None},
"handler": "philips_js",
"reason": "pairing_failure",
"type": "abort",
}
async def test_pair_grant_failed(hass, mock_tv_pairable, mock_setup, mock_setup_entry):
"""Test we get the form."""
mock_tv = mock_tv_pairable
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == "form"
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
MOCK_USERINPUT,
)
assert result["type"] == "form"
assert result["errors"] == {}
mock_tv.setTransport.assert_called_with(True)
mock_tv.pairRequest.assert_called()
# Test with invalid pin
mock_tv.pairGrant.side_effect = PairingFailure({"error_id": "INVALID_PIN"})
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"pin": "1234"}
)
assert result["type"] == "form"
assert result["errors"] == {"pin": "invalid_pin"}
# Test with unexpected failure
mock_tv.pairGrant.side_effect = PairingFailure({})
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"pin": "1234"}
)
assert result == {
"flow_id": ANY,
"description_placeholders": {"error_id": None},
"handler": "philips_js",
"reason": "pairing_failure",
"type": "abort",
}

View File

@ -33,9 +33,13 @@ async def test_get_triggers(hass, mock_device):
assert_lists_same(triggers, expected_triggers)
async def test_if_fires_on_turn_on_request(hass, calls, mock_entity, mock_device):
async def test_if_fires_on_turn_on_request(
hass, calls, mock_tv, mock_entity, mock_device
):
"""Test for turn_on and turn_off triggers firing."""
mock_tv.on = False
assert await async_setup_component(
hass,
automation.DOMAIN,