Change HEOS component library and add basic config flow (#22517)

* Update heos lib

* Update requirements files

* Removed unecessary mock_coro usage

* Remove assert_called_once usage

* Updates from review feedback

* Remove extra param to error format
This commit is contained in:
Andrew Sayre 2019-03-29 23:10:00 -05:00 committed by Paulus Schoutsen
parent fe8e51e2e9
commit 1bfe86b30d
15 changed files with 670 additions and 126 deletions

View File

@ -240,7 +240,6 @@ omit =
homeassistant/components/hikvisioncam/switch.py
homeassistant/components/hipchat/notify.py
homeassistant/components/hitron_coda/device_tracker.py
homeassistant/components/heos/*
homeassistant/components/hive/*
homeassistant/components/hlk_sw16/*
homeassistant/components/homekit_controller/*

View File

@ -131,6 +131,7 @@ homeassistant/components/gtfs/sensor.py @robbiet480
# H
homeassistant/components/harmony/* @ehendrix23
homeassistant/components/heos/* @andrewsayre
homeassistant/components/hikvision/binary_sensor.py @mezz64
homeassistant/components/history_graph/* @andrey-git
homeassistant/components/hive/* @Rendili @KJonline

View File

@ -0,0 +1,5 @@
{
"config": {
"title": "Heos"
}
}

View File

@ -1,5 +1,4 @@
"""Denon HEOS Media Player."""
import asyncio
import logging
@ -7,13 +6,15 @@ import voluptuous as vol
from homeassistant.components.media_player.const import (
DOMAIN as MEDIA_PLAYER_DOMAIN)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_HOST, EVENT_HOMEASSISTANT_STOP
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.typing import ConfigType, HomeAssistantType
DOMAIN = 'heos'
REQUIREMENTS = ['aioheos==0.4.0']
from .config_flow import format_title
from .const import DATA_CONTROLLER, DOMAIN
REQUIREMENTS = ['pyheos==0.2.0']
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
@ -26,27 +27,66 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup(hass: HomeAssistantType, config: ConfigType):
"""Set up the HEOS component."""
from aioheos import AioHeosController
host = config[DOMAIN][CONF_HOST]
controller = AioHeosController(hass.loop, host)
try:
await asyncio.wait_for(controller.connect(), timeout=5.0)
except asyncio.TimeoutError:
_LOGGER.error('Timeout during setup.')
return False
async def controller_close(event):
"""Close connection when HASS shutsdown."""
await controller.close()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, controller_close)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][MEDIA_PLAYER_DOMAIN] = controller
hass.async_create_task(async_load_platform(
hass, MEDIA_PLAYER_DOMAIN, DOMAIN, {}, config))
entries = hass.config_entries.async_entries(DOMAIN)
if not entries:
# Create new entry based on config
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN, context={'source': 'import'},
data={CONF_HOST: host}))
else:
# Check if host needs to be updated
entry = entries[0]
if entry.data[CONF_HOST] != host:
entry.data[CONF_HOST] = host
entry.title = format_title(host)
hass.config_entries.async_update_entry(entry)
return True
async def async_setup_entry(hass: HomeAssistantType, entry: ConfigEntry):
"""Initialize config entry which represents the HEOS controller."""
from pyheos import Heos
host = entry.data[CONF_HOST]
# Setting all_progress_events=False ensures that we only receive a
# media position update upon start of playback or when media changes
controller = Heos(host, all_progress_events=False)
try:
await controller.connect(auto_reconnect=True)
# Auto reconnect only operates if initial connection was successful.
except (asyncio.TimeoutError, ConnectionError) as error:
await controller.disconnect()
_LOGGER.exception("Unable to connect to controller %s: %s",
host, type(error).__name__)
return False
async def disconnect_controller(event):
await controller.disconnect()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, disconnect_controller)
try:
players = await controller.get_players()
except (asyncio.TimeoutError, ConnectionError) as error:
await controller.disconnect()
_LOGGER.exception("Unable to retrieve players: %s",
type(error).__name__)
return False
hass.data[DOMAIN] = {
DATA_CONTROLLER: controller,
MEDIA_PLAYER_DOMAIN: players
}
hass.async_create_task(hass.config_entries.async_forward_entry_setup(
entry, MEDIA_PLAYER_DOMAIN))
return True
async def async_unload_entry(hass: HomeAssistantType, entry: ConfigEntry):
"""Unload a config entry."""
controller = hass.data[DOMAIN][DATA_CONTROLLER]
await controller.disconnect()
hass.data.pop(DOMAIN)
return await hass.config_entries.async_forward_entry_unload(
entry, MEDIA_PLAYER_DOMAIN)

View File

@ -0,0 +1,25 @@
"""Config flow to configure Heos."""
from homeassistant import config_entries
from homeassistant.const import CONF_HOST
from .const import DOMAIN
def format_title(host: str) -> str:
"""Format the title for config entries."""
return "Controller ({})".format(host)
@config_entries.HANDLERS.register(DOMAIN)
class HeosFlowHandler(config_entries.ConfigFlow):
"""Define a flow for HEOS."""
VERSION = 1
CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
async def async_step_import(self, user_input=None):
"""Occurs when an entry is setup through config."""
host = user_input[CONF_HOST]
return self.async_create_entry(
title=format_title(host),
data={CONF_HOST: host})

View File

@ -0,0 +1,4 @@
"""Const for the HEOS integration."""
DATA_CONTROLLER = "controller"
DOMAIN = 'heos'

View File

@ -1,42 +1,35 @@
"""Denon HEOS Media Player."""
from functools import reduce
from operator import ior
from homeassistant.components.media_player import MediaPlayerDevice
from homeassistant.components.media_player.const import (
DOMAIN, MEDIA_TYPE_MUSIC, SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY,
SUPPORT_PLAY_MEDIA, SUPPORT_PREVIOUS_TRACK, SUPPORT_STOP,
SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_STEP)
DOMAIN, MEDIA_TYPE_MUSIC, SUPPORT_CLEAR_PLAYLIST, SUPPORT_NEXT_TRACK,
SUPPORT_PAUSE, SUPPORT_PLAY, SUPPORT_PREVIOUS_TRACK, SUPPORT_SHUFFLE_SET,
SUPPORT_STOP, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET, SUPPORT_VOLUME_STEP)
from homeassistant.const import STATE_IDLE, STATE_PAUSED, STATE_PLAYING
from homeassistant.util.dt import utcnow
from . import DOMAIN as HEOS_DOMAIN
from .const import DOMAIN as HEOS_DOMAIN
DEPENDENCIES = ["heos"]
DEPENDENCIES = ['heos']
SUPPORT_HEOS = (
SUPPORT_PLAY
| SUPPORT_STOP
| SUPPORT_PAUSE
| SUPPORT_PLAY_MEDIA
| SUPPORT_PREVIOUS_TRACK
| SUPPORT_NEXT_TRACK
| SUPPORT_VOLUME_MUTE
| SUPPORT_VOLUME_SET
| SUPPORT_VOLUME_STEP
)
PLAY_STATE_TO_STATE = {
"play": STATE_PLAYING,
"pause": STATE_PAUSED,
"stop": STATE_IDLE,
}
BASE_SUPPORTED_FEATURES = SUPPORT_VOLUME_MUTE | SUPPORT_VOLUME_SET | \
SUPPORT_VOLUME_STEP | SUPPORT_CLEAR_PLAYLIST | \
SUPPORT_SHUFFLE_SET
async def async_setup_platform(hass, config, async_add_devices,
discover_info=None):
"""Set up the HEOS platform."""
controller = hass.data[HEOS_DOMAIN][DOMAIN]
players = controller.get_players()
devices = [HeosMediaPlayer(p) for p in players]
async_add_devices(devices, True)
async def async_setup_platform(
hass, config, async_add_entities, discovery_info=None):
"""Platform uses config entry setup."""
pass
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Add binary sensors for a config entry."""
players = hass.data[HEOS_DOMAIN][DOMAIN]
devices = [HeosMediaPlayer(player) for player in players.values()]
async_add_entities(devices, True)
class HeosMediaPlayer(MediaPlayerDevice):
@ -44,109 +37,225 @@ class HeosMediaPlayer(MediaPlayerDevice):
def __init__(self, player):
"""Initialize."""
from pyheos import const
self._media_position_updated_at = None
self._player = player
self._signals = []
self._supported_features = BASE_SUPPORTED_FEATURES
self._play_state_to_state = {
const.PLAY_STATE_PLAY: STATE_PLAYING,
const.PLAY_STATE_STOP: STATE_IDLE,
const.PLAY_STATE_PAUSE: STATE_PAUSED
}
self._control_to_support = {
const.CONTROL_PLAY: SUPPORT_PLAY,
const.CONTROL_PAUSE: SUPPORT_PAUSE,
const.CONTROL_STOP: SUPPORT_STOP,
const.CONTROL_PLAY_PREVIOUS: SUPPORT_PREVIOUS_TRACK,
const.CONTROL_PLAY_NEXT: SUPPORT_NEXT_TRACK
}
def _update_state(self):
self.async_schedule_update_ha_state()
async def _controller_event(self, event):
"""Handle controller event."""
from pyheos import const
if event == const.EVENT_PLAYERS_CHANGED:
await self.async_update_ha_state(True)
async def async_update(self):
"""Update the player."""
self._player.request_update()
async def _heos_event(self, event):
"""Handle connection event."""
await self.async_update_ha_state(True)
async def _player_update(self, player_id, event):
"""Handle player attribute updated."""
from pyheos import const
if self._player.player_id != player_id:
return
if event == const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
self._media_position_updated_at = utcnow()
await self.async_update_ha_state(True)
async def async_added_to_hass(self):
"""Device added to hass."""
self._player.state_change_callback = self._update_state
from pyheos import const
# Update state when attributes of the player change
self._signals.append(self._player.heos.dispatcher.connect(
const.SIGNAL_PLAYER_EVENT, self._player_update))
# Update state when available players change
self._signals.append(self._player.heos.dispatcher.connect(
const.SIGNAL_CONTROLLER_EVENT, self._controller_event))
# Update state upon connect/disconnects
self._signals.append(self._player.heos.dispatcher.connect(
const.SIGNAL_HEOS_EVENT, self._heos_event))
async def async_clear_playlist(self):
"""Clear players playlist."""
await self._player.clear_queue()
async def async_media_pause(self):
"""Send pause command."""
await self._player.pause()
async def async_media_play(self):
"""Send play command."""
await self._player.play()
async def async_media_previous_track(self):
"""Send previous track command."""
await self._player.play_previous()
async def async_media_next_track(self):
"""Send next track command."""
await self._player.play_next()
async def async_media_stop(self):
"""Send stop command."""
await self._player.stop()
async def async_mute_volume(self, mute):
"""Mute the volume."""
await self._player.set_mute(mute)
async def async_set_shuffle(self, shuffle):
"""Enable/disable shuffle mode."""
await self._player.set_play_mode(self._player.repeat, shuffle)
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
await self._player.set_volume(volume * 100)
async def async_update(self):
"""Update supported features of the player."""
controls = self._player.now_playing_media.supported_controls
current_support = [self._control_to_support[control]
for control in controls]
self._supported_features = reduce(ior, current_support,
BASE_SUPPORTED_FEATURES)
async def async_will_remove_from_hass(self):
"""Disconnect the device when removed."""
for signal_remove in self._signals:
signal_remove()
self._signals.clear()
@property
def unique_id(self):
"""Get unique id of the player."""
return self._player.player_id
def available(self) -> bool:
"""Return True if the device is available."""
return self._player.available
@property
def name(self):
"""Return the name of the device."""
return self._player.name
def device_info(self) -> dict:
"""Get attributes about the device."""
return {
'identifiers': {
(DOMAIN, self._player.player_id)
},
'name': self._player.name,
'model': self._player.model,
'manufacturer': 'HEOS',
'sw_version': self._player.version
}
@property
def volume_level(self):
"""Volume level of the device (0..1)."""
volume = self._player.volume
return float(volume) / 100
def device_state_attributes(self) -> dict:
"""Get additional attribute about the state."""
return {
'media_album_id': self._player.now_playing_media.album_id,
'media_queue_id': self._player.now_playing_media.queue_id,
'media_source_id': self._player.now_playing_media.source_id,
'media_station': self._player.now_playing_media.station,
'media_type': self._player.now_playing_media.type
}
@property
def state(self):
"""Get state."""
return PLAY_STATE_TO_STATE.get(self._player.play_state)
def is_volume_muted(self) -> bool:
"""Boolean if volume is currently muted."""
return self._player.is_muted
@property
def should_poll(self):
"""No polling needed."""
return False
def media_album_name(self) -> str:
"""Album name of current playing media, music track only."""
return self._player.now_playing_media.album
@property
def media_content_type(self):
def media_artist(self) -> str:
"""Artist of current playing media, music track only."""
return self._player.now_playing_media.artist
@property
def media_content_id(self) -> str:
"""Content ID of current playing media."""
return self._player.now_playing_media.media_id
@property
def media_content_type(self) -> str:
"""Content type of current playing media."""
return MEDIA_TYPE_MUSIC
@property
def media_artist(self):
"""Artist of current playing media."""
return self._player.media_artist
def media_duration(self):
"""Duration of current playing media in seconds."""
duration = self._player.now_playing_media.duration
if isinstance(duration, int):
return duration / 1000
return None
@property
def media_title(self):
"""Album name of current playing media."""
return self._player.media_title
def media_position(self):
"""Position of current playing media in seconds."""
# Some media doesn't have duration but reports position, return None
if not self._player.now_playing_media.duration:
return None
return self._player.now_playing_media.current_position / 1000
@property
def media_album_name(self):
"""Album name of current playing media."""
return self._player.media_album
def media_position_updated_at(self):
"""When was the position of the current playing media valid."""
# Some media doesn't have duration but reports position, return None
if not self._player.now_playing_media.duration:
return None
return self._media_position_updated_at
@property
def media_image_url(self):
"""Return the image url of current playing media."""
return self._player.media_image_url
def media_image_url(self) -> str:
"""Image url of current playing media."""
return self._player.now_playing_media.image_url
@property
def media_content_id(self):
"""Return the content ID of current playing media."""
return self._player.media_id
def media_title(self) -> str:
"""Title of current playing media."""
return self._player.now_playing_media.song
@property
def is_volume_muted(self):
"""Boolean if volume is currently muted."""
return self._player.mute == "on"
async def async_mute_volume(self, mute):
"""Mute volume."""
self._player.set_mute(mute)
async def async_media_next_track(self):
"""Go TO next track."""
self._player.play_next()
async def async_media_previous_track(self):
"""Go TO previous track."""
self._player.play_previous()
def name(self) -> str:
"""Return the name of the device."""
return self._player.name
@property
def supported_features(self):
"""Flag of media commands that are supported."""
return SUPPORT_HEOS
def should_poll(self) -> bool:
"""No polling needed for this device."""
return False
async def async_set_volume_level(self, volume):
"""Set volume level, range 0..1."""
self._player.set_volume(volume * 100)
@property
def shuffle(self) -> bool:
"""Boolean if shuffle is enabled."""
return self._player.shuffle
async def async_media_play(self):
"""Play media player."""
self._player.play()
@property
def state(self) -> str:
"""State of the player."""
return self._play_state_to_state[self._player.state]
async def async_media_stop(self):
"""Stop media player."""
self._player.stop()
@property
def supported_features(self) -> int:
"""Flag media player features that are supported."""
return self._supported_features
async def async_media_pause(self):
"""Pause media player."""
self._player.pause()
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return str(self._player.player_id)
@property
def volume_level(self) -> float:
"""Volume level of the media player (0..1)."""
return self._player.volume / 100

View File

@ -0,0 +1,5 @@
{
"config": {
"title": "Heos"
}
}

View File

@ -123,9 +123,6 @@ aioftp==0.12.0
# homeassistant.components.harmony.remote
aioharmony==0.1.8
# homeassistant.components.heos
aioheos==0.4.0
# homeassistant.components.emulated_hue
# homeassistant.components.http
aiohttp_cors==0.7.0
@ -1076,6 +1073,9 @@ pygtt==1.1.2
# homeassistant.components.version.sensor
pyhaversion==2.0.3
# homeassistant.components.heos
pyheos==0.2.0
# homeassistant.components.hikvision.binary_sensor
pyhik==0.2.2

View File

@ -205,6 +205,9 @@ pydeconz==54
# homeassistant.components.zwave
pydispatcher==2.0.5
# homeassistant.components.heos
pyheos==0.2.0
# homeassistant.components.homematic
pyhomematic==0.1.58

View File

@ -90,6 +90,7 @@ TEST_REQUIREMENTS = (
'pyblackbird',
'pydeconz',
'pydispatcher',
'pyheos',
'pyhomematic',
'pylitejet',
'pymonoprice',

View File

@ -0,0 +1 @@
"""Tests for the Heos component."""

View File

@ -0,0 +1,67 @@
"""Configuration for HEOS tests."""
from asynctest.mock import Mock, patch as patch
from pyheos import Dispatcher, HeosPlayer, const
import pytest
from homeassistant.components.heos import DOMAIN
from homeassistant.const import CONF_HOST
from tests.common import MockConfigEntry
@pytest.fixture(name="config_entry")
def config_entry_fixture():
"""Create a mock HEOS config entry."""
return MockConfigEntry(domain=DOMAIN, data={CONF_HOST: '127.0.0.1'},
title='Controller (127.0.0.1)')
@pytest.fixture(name="controller")
def controller_fixture(players):
"""Create a mock Heos controller fixture."""
with patch("pyheos.Heos", autospec=True) as mock:
mock_heos = mock.return_value
mock_heos.get_players.return_value = players
mock_heos.players = players
yield mock_heos
@pytest.fixture(name="config")
def config_fixture():
"""Create hass config fixture."""
return {
DOMAIN: {CONF_HOST: '127.0.0.1'}
}
@pytest.fixture(name="players")
def player_fixture():
"""Create a mock HeosPlayer."""
player = Mock(HeosPlayer, autospec=True)
player.heos.dispatcher = Dispatcher()
player.player_id = 1
player.name = "Test Player"
player.model = "Test Model"
player.version = "1.0.0"
player.is_muted = False
player.available = True
player.state = const.PLAY_STATE_STOP
player.ip_address = "127.0.0.1"
player.network = "wired"
player.shuffle = False
player.repeat = const.REPEAT_OFF
player.volume = 25
player.now_playing_media.supported_controls = const.CONTROLS_ALL
player.now_playing_media.album_id = 1
player.now_playing_media.queue_id = 1
player.now_playing_media.source_id = 1
player.now_playing_media.station = "Station Name"
player.now_playing_media.type = "Station"
player.now_playing_media.album = "Album"
player.now_playing_media.artist = "Artist"
player.now_playing_media.media_id = "1"
player.now_playing_media.duration = None
player.now_playing_media.current_position = None
player.now_playing_media.image_url = "http://"
player.now_playing_media.song = "Song"
return {player.player_id: player}

View File

@ -0,0 +1,104 @@
"""Tests for the init module."""
import asyncio
from asynctest import patch
from homeassistant.components.heos import async_setup_entry, async_unload_entry
from homeassistant.components.heos.const import DATA_CONTROLLER, DOMAIN
from homeassistant.components.media_player.const import (
DOMAIN as MEDIA_PLAYER_DOMAIN)
from homeassistant.const import CONF_HOST
from homeassistant.setup import async_setup_component
async def test_async_setup_creates_entry(hass, config):
"""Test component setup creates entry from config."""
assert await async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
entry = entries[0]
assert entry.title == 'Controller (127.0.0.1)'
assert entry.data == {CONF_HOST: '127.0.0.1'}
async def test_async_setup_updates_entry(hass, config_entry, config):
"""Test component setup updates entry from config."""
config[DOMAIN][CONF_HOST] = '127.0.0.2'
config_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
entry = entries[0]
assert entry.title == 'Controller (127.0.0.2)'
assert entry.data == {CONF_HOST: '127.0.0.2'}
async def test_async_setup_returns_true(hass, config_entry, config):
"""Test component setup updates entry from config."""
config_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
assert entries[0] == config_entry
async def test_async_setup_entry_loads_platforms(
hass, config_entry, controller):
"""Test load connects to heos, retrieves players, and loads platforms."""
config_entry.add_to_hass(hass)
with patch.object(
hass.config_entries, 'async_forward_entry_setup') as forward_mock:
assert await async_setup_entry(hass, config_entry)
# Assert platforms loaded
await hass.async_block_till_done()
assert forward_mock.call_count == 1
assert controller.connect.call_count == 1
controller.disconnect.assert_not_called()
assert hass.data[DOMAIN] == {
DATA_CONTROLLER: controller,
MEDIA_PLAYER_DOMAIN: controller.players
}
async def test_async_setup_entry_connect_failure(
hass, config_entry, controller):
"""Test failure to connect does not load entry."""
config_entry.add_to_hass(hass)
errors = [ConnectionError, asyncio.TimeoutError]
for error in errors:
controller.connect.side_effect = error
assert not await async_setup_entry(hass, config_entry)
await hass.async_block_till_done()
assert controller.connect.call_count == 1
assert controller.disconnect.call_count == 1
controller.connect.reset_mock()
controller.disconnect.reset_mock()
async def test_async_setup_entry_player_failure(
hass, config_entry, controller):
"""Test failure to retrieve players does not load entry."""
config_entry.add_to_hass(hass)
errors = [ConnectionError, asyncio.TimeoutError]
for error in errors:
controller.get_players.side_effect = error
assert not await async_setup_entry(hass, config_entry)
await hass.async_block_till_done()
assert controller.connect.call_count == 1
assert controller.disconnect.call_count == 1
controller.connect.reset_mock()
controller.disconnect.reset_mock()
async def test_unload_entry(hass, config_entry, controller):
"""Test entries are unloaded correctly."""
hass.data[DOMAIN] = {DATA_CONTROLLER: controller}
with patch.object(hass.config_entries, 'async_forward_entry_unload',
return_value=True) as unload:
assert await async_unload_entry(hass, config_entry)
await hass.async_block_till_done()
assert controller.disconnect.call_count == 1
assert unload.call_count == 1

View File

@ -0,0 +1,180 @@
"""Tests for the Heos Media Player platform."""
from pyheos import const
from homeassistant.components.heos import media_player
from homeassistant.components.heos.const import DOMAIN
from homeassistant.components.media_player.const import (
ATTR_MEDIA_ALBUM_NAME, ATTR_MEDIA_ARTIST, ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE, ATTR_MEDIA_DURATION, ATTR_MEDIA_POSITION,
ATTR_MEDIA_POSITION_UPDATED_AT, ATTR_MEDIA_SHUFFLE, ATTR_MEDIA_TITLE,
ATTR_MEDIA_VOLUME_LEVEL, ATTR_MEDIA_VOLUME_MUTED,
DOMAIN as MEDIA_PLAYER_DOMAIN, MEDIA_TYPE_MUSIC, SERVICE_CLEAR_PLAYLIST,
SUPPORT_NEXT_TRACK, SUPPORT_PAUSE, SUPPORT_PLAY, SUPPORT_PREVIOUS_TRACK,
SUPPORT_STOP)
from homeassistant.const import (
ATTR_ENTITY_ID, ATTR_FRIENDLY_NAME, ATTR_SUPPORTED_FEATURES,
SERVICE_MEDIA_NEXT_TRACK, SERVICE_MEDIA_PAUSE, SERVICE_MEDIA_PLAY,
SERVICE_MEDIA_PREVIOUS_TRACK, SERVICE_MEDIA_STOP, SERVICE_SHUFFLE_SET,
SERVICE_VOLUME_MUTE, SERVICE_VOLUME_SET, STATE_IDLE, STATE_PLAYING,
STATE_UNAVAILABLE)
from homeassistant.setup import async_setup_component
async def setup_platform(hass, config_entry, config):
"""Set up the media player platform for testing."""
config_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
async def test_async_setup_platform():
"""Test setup platform does nothing (it uses config entries)."""
await media_player.async_setup_platform(None, None, None)
async def test_state_attributes(hass, config_entry, config, controller):
"""Tests the state attributes."""
await setup_platform(hass, config_entry, config)
state = hass.states.get('media_player.test_player')
assert state.state == STATE_IDLE
assert state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.25
assert not state.attributes[ATTR_MEDIA_VOLUME_MUTED]
assert state.attributes[ATTR_MEDIA_CONTENT_ID] == "1"
assert state.attributes[ATTR_MEDIA_CONTENT_TYPE] == MEDIA_TYPE_MUSIC
assert ATTR_MEDIA_DURATION not in state.attributes
assert ATTR_MEDIA_POSITION not in state.attributes
assert state.attributes[ATTR_MEDIA_TITLE] == "Song"
assert state.attributes[ATTR_MEDIA_ARTIST] == "Artist"
assert state.attributes[ATTR_MEDIA_ALBUM_NAME] == "Album"
assert not state.attributes[ATTR_MEDIA_SHUFFLE]
assert state.attributes['media_album_id'] == 1
assert state.attributes['media_queue_id'] == 1
assert state.attributes['media_source_id'] == 1
assert state.attributes['media_station'] == "Station Name"
assert state.attributes['media_type'] == "Station"
assert state.attributes[ATTR_FRIENDLY_NAME] == "Test Player"
assert state.attributes[ATTR_SUPPORTED_FEATURES] == \
SUPPORT_PLAY | SUPPORT_PAUSE | SUPPORT_STOP | SUPPORT_NEXT_TRACK | \
SUPPORT_PREVIOUS_TRACK | media_player.BASE_SUPPORTED_FEATURES
async def test_updates_start_from_signals(
hass, config_entry, config, controller):
"""Tests dispatched signals update player."""
await setup_platform(hass, config_entry, config)
player = controller.players[1]
# Test player does not update for other players
player.state = const.PLAY_STATE_PLAY
player.heos.dispatcher.send(
const.SIGNAL_PLAYER_EVENT, 2,
const.EVENT_PLAYER_STATE_CHANGED)
await hass.async_block_till_done()
state = hass.states.get('media_player.test_player')
assert state.state == STATE_IDLE
# Test player_update standard events
player.state = const.PLAY_STATE_PLAY
player.heos.dispatcher.send(
const.SIGNAL_PLAYER_EVENT, player.player_id,
const.EVENT_PLAYER_STATE_CHANGED)
await hass.async_block_till_done()
state = hass.states.get('media_player.test_player')
assert state.state == STATE_PLAYING
# Test player_update progress events
player.now_playing_media.duration = 360000
player.now_playing_media.current_position = 1000
player.heos.dispatcher.send(
const.SIGNAL_PLAYER_EVENT, player.player_id,
const.EVENT_PLAYER_NOW_PLAYING_PROGRESS)
await hass.async_block_till_done()
state = hass.states.get('media_player.test_player')
assert state.attributes[ATTR_MEDIA_POSITION_UPDATED_AT] is not None
assert state.attributes[ATTR_MEDIA_DURATION] == 360
assert state.attributes[ATTR_MEDIA_POSITION] == 1
# Test controller player change updates
player.available = False
player.heos.dispatcher.send(
const.SIGNAL_CONTROLLER_EVENT, const.EVENT_PLAYERS_CHANGED)
await hass.async_block_till_done()
state = hass.states.get('media_player.test_player')
assert state.state == STATE_UNAVAILABLE
# Test heos events update
player.available = True
player.heos.dispatcher.send(
const.SIGNAL_HEOS_EVENT, const.EVENT_CONNECTED)
await hass.async_block_till_done()
state = hass.states.get('media_player.test_player')
assert state.state == STATE_PLAYING
async def test_services(hass, config_entry, config, controller):
"""Tests player commands."""
await setup_platform(hass, config_entry, config)
player = controller.players[1]
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_CLEAR_PLAYLIST,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.clear_queue.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_MEDIA_PAUSE,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.pause.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_MEDIA_PLAY,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.play.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_MEDIA_PREVIOUS_TRACK,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.play_previous.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_MEDIA_NEXT_TRACK,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.play_next.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_MEDIA_STOP,
{ATTR_ENTITY_ID: 'media_player.test_player'}, blocking=True)
assert player.stop.call_count == 1
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_VOLUME_MUTE,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_VOLUME_MUTED: True}, blocking=True)
player.set_mute.assert_called_once_with(True)
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_SHUFFLE_SET,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_SHUFFLE: True}, blocking=True)
player.set_play_mode.assert_called_once_with(player.repeat, True)
player.reset_mock()
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN, SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: 'media_player.test_player',
ATTR_MEDIA_VOLUME_LEVEL: 1}, blocking=True)
player.set_volume.assert_called_once_with(100)
async def test_unload_config_entry(hass, config_entry, config, controller):
"""Test the player is removed when the config entry is unloaded."""
await setup_platform(hass, config_entry, config)
await config_entry.async_unload(hass)
assert not hass.states.get('media_player.test_player')