Songpal code and test improvement (#35318)

* Use tests.async_mock instead of asynctest

* Remove unnecessary existence check

* Improve songpal service registering

* Add tests

* Seperate device api from entity api

* Improve disconnect log messages

* Improve tests

* Rename SongpalDevice to SongpalEntity

* Improve reconnecting

* Remove logging and sleep patch from tests

* Test unavailable state when disconnected

* Rename SongpalEntity.dev to _dev

* Add quality scale to manifest
This commit is contained in:
Xiaonan Shen 2020-05-09 19:30:34 -07:00 committed by GitHub
parent 3af3900581
commit 8994931ec4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 550 additions and 153 deletions

View File

@ -696,8 +696,6 @@ omit =
homeassistant/components/somfy/* homeassistant/components/somfy/*
homeassistant/components/somfy_mylink/* homeassistant/components/somfy_mylink/*
homeassistant/components/sonarr/sensor.py homeassistant/components/sonarr/sensor.py
homeassistant/components/songpal/__init__.py
homeassistant/components/songpal/media_player.py
homeassistant/components/sonos/* homeassistant/components/sonos/*
homeassistant/components/sony_projector/switch.py homeassistant/components/sony_projector/switch.py
homeassistant/components/spc/* homeassistant/components/spc/*

View File

@ -10,5 +10,6 @@
"st": "urn:schemas-sony-com:service:ScalarWebAPI:1", "st": "urn:schemas-sony-com:service:ScalarWebAPI:1",
"manufacturer": "Sony Corporation" "manufacturer": "Sony Corporation"
} }
] ],
"quality_scale": "gold"
} }

View File

@ -3,6 +3,7 @@ import asyncio
from collections import OrderedDict from collections import OrderedDict
import logging import logging
import async_timeout
from songpal import ( from songpal import (
ConnectChange, ConnectChange,
ContentChange, ContentChange,
@ -23,15 +24,13 @@ from homeassistant.components.media_player.const import (
SUPPORT_VOLUME_STEP, SUPPORT_VOLUME_STEP,
) )
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ( from homeassistant.const import CONF_NAME, EVENT_HOMEASSISTANT_STOP, STATE_OFF, STATE_ON
ATTR_ENTITY_ID,
CONF_NAME,
EVENT_HOMEASSISTANT_STOP,
STATE_OFF,
STATE_ON,
)
from homeassistant.exceptions import PlatformNotReady from homeassistant.exceptions import PlatformNotReady
from homeassistant.helpers import config_validation as cv, device_registry as dr from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
entity_platform,
)
from homeassistant.helpers.typing import HomeAssistantType from homeassistant.helpers.typing import HomeAssistantType
from .const import CONF_ENDPOINT, DOMAIN, SET_SOUND_SETTING from .const import CONF_ENDPOINT, DOMAIN, SET_SOUND_SETTING
@ -50,13 +49,7 @@ SUPPORT_SONGPAL = (
| SUPPORT_TURN_OFF | SUPPORT_TURN_OFF
) )
SET_SOUND_SCHEMA = vol.Schema( INITIAL_RETRY_DELAY = 10
{
vol.Optional(ATTR_ENTITY_ID): cv.entity_id,
vol.Required(PARAM_NAME): cv.string,
vol.Required(PARAM_VALUE): cv.string,
}
)
async def async_setup_platform( async def async_setup_platform(
@ -72,58 +65,38 @@ async def async_setup_entry(
hass: HomeAssistantType, config_entry: ConfigEntry, async_add_entities hass: HomeAssistantType, config_entry: ConfigEntry, async_add_entities
) -> None: ) -> None:
"""Set up songpal media player.""" """Set up songpal media player."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = {}
name = config_entry.data[CONF_NAME] name = config_entry.data[CONF_NAME]
endpoint = config_entry.data[CONF_ENDPOINT] endpoint = config_entry.data[CONF_ENDPOINT]
if endpoint in hass.data[DOMAIN]: device = Device(endpoint)
_LOGGER.debug("The endpoint exists already, skipping setup.")
return
device = SongpalDevice(name, endpoint)
try: try:
await device.initialize() async with async_timeout.timeout(
except SongpalException as ex: 10
_LOGGER.error("Unable to get methods from songpal: %s", ex) ): # set timeout to avoid blocking the setup process
await device.get_supported_methods()
except (SongpalException, asyncio.TimeoutError) as ex:
_LOGGER.warning("[%s(%s)] Unable to connect.", name, endpoint)
_LOGGER.debug("Unable to get methods from songpal: %s", ex)
raise PlatformNotReady raise PlatformNotReady
hass.data[DOMAIN][endpoint] = device songpal_entity = SongpalEntity(name, device)
async_add_entities([songpal_entity], True)
async_add_entities([device], True) platform = entity_platform.current_platform.get()
platform.async_register_entity_service(
async def async_service_handler(service): SET_SOUND_SETTING,
"""Service handler.""" {vol.Required(PARAM_NAME): cv.string, vol.Required(PARAM_VALUE): cv.string},
entity_id = service.data.get("entity_id", None) "async_set_sound_setting",
params = {
key: value for key, value in service.data.items() if key != ATTR_ENTITY_ID
}
for device in hass.data[DOMAIN].values():
if device.entity_id == entity_id or entity_id is None:
_LOGGER.debug(
"Calling %s (entity: %s) with params %s", service, entity_id, params
)
await device.async_set_sound_setting(
params[PARAM_NAME], params[PARAM_VALUE]
)
hass.services.async_register(
DOMAIN, SET_SOUND_SETTING, async_service_handler, schema=SET_SOUND_SCHEMA
) )
class SongpalDevice(MediaPlayerEntity): class SongpalEntity(MediaPlayerEntity):
"""Class representing a Songpal device.""" """Class representing a Songpal device."""
def __init__(self, name, endpoint, poll=False): def __init__(self, name, device):
"""Init.""" """Init."""
self._name = name self._name = name
self._endpoint = endpoint self._dev = device
self._poll = poll
self.dev = Device(self._endpoint)
self._sysinfo = None self._sysinfo = None
self._model = None self._model = None
@ -143,19 +116,15 @@ class SongpalDevice(MediaPlayerEntity):
@property @property
def should_poll(self): def should_poll(self):
"""Return True if the device should be polled.""" """Return True if the device should be polled."""
return self._poll return False
async def initialize(self): async def async_added_to_hass(self):
"""Initialize the device.""" """Run when entity is added to hass."""
await self.dev.get_supported_methods() await self.async_activate_websocket()
self._sysinfo = await self.dev.get_system_info()
interface_info = await self.dev.get_interface_information()
self._model = interface_info.modelName
async def async_will_remove_from_hass(self): async def async_will_remove_from_hass(self):
"""Run when entity will be removed from hass.""" """Run when entity will be removed from hass."""
self.hass.data[DOMAIN].pop(self._endpoint) await self._dev.stop_listen_notifications()
await self.dev.stop_listen_notifications()
async def async_activate_websocket(self): async def async_activate_websocket(self):
"""Activate websocket for listening if wanted.""" """Activate websocket for listening if wanted."""
@ -182,40 +151,48 @@ class SongpalDevice(MediaPlayerEntity):
self.async_write_ha_state() self.async_write_ha_state()
async def _try_reconnect(connect: ConnectChange): async def _try_reconnect(connect: ConnectChange):
_LOGGER.error( _LOGGER.warning(
"Got disconnected with %s, trying to reconnect.", connect.exception "[%s(%s)] Got disconnected, trying to reconnect.",
self.name,
self._dev.endpoint,
) )
_LOGGER.debug("Disconnected: %s", connect.exception)
self._available = False self._available = False
self.dev.clear_notification_callbacks()
self.async_write_ha_state() self.async_write_ha_state()
# Try to reconnect forever, a successful reconnect will initialize # Try to reconnect forever, a successful reconnect will initialize
# the websocket connection again. # the websocket connection again.
delay = 10 delay = INITIAL_RETRY_DELAY
while not self._available: while not self._available:
_LOGGER.debug("Trying to reconnect in %s seconds", delay) _LOGGER.debug("Trying to reconnect in %s seconds", delay)
await asyncio.sleep(delay) await asyncio.sleep(delay)
# We need to inform HA about the state in case we are coming
# back from a disconnected state.
await self.async_update_ha_state(force_refresh=True)
delay = min(2 * delay, 300)
_LOGGER.info("Reconnected to %s", self.name) try:
await self._dev.get_supported_methods()
except SongpalException as ex:
_LOGGER.debug("Failed to reconnect: %s", ex)
delay = min(2 * delay, 300)
else:
# We need to inform HA about the state in case we are coming
# back from a disconnected state.
await self.async_update_ha_state(force_refresh=True)
self.dev.on_notification(VolumeChange, _volume_changed) self.hass.loop.create_task(self._dev.listen_notifications())
self.dev.on_notification(ContentChange, _source_changed) _LOGGER.warning(
self.dev.on_notification(PowerChange, _power_changed) "[%s(%s)] Connection reestablished.", self.name, self._dev.endpoint
self.dev.on_notification(ConnectChange, _try_reconnect) )
async def listen_events(): self._dev.on_notification(VolumeChange, _volume_changed)
await self.dev.listen_notifications() self._dev.on_notification(ContentChange, _source_changed)
self._dev.on_notification(PowerChange, _power_changed)
self._dev.on_notification(ConnectChange, _try_reconnect)
async def handle_stop(event): async def handle_stop(event):
await self.dev.stop_listen_notifications() await self._dev.stop_listen_notifications()
self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_stop) self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, handle_stop)
self.hass.loop.create_task(listen_events()) self.hass.loop.create_task(self._dev.listen_notifications())
@property @property
def name(self): def name(self):
@ -246,12 +223,20 @@ class SongpalDevice(MediaPlayerEntity):
async def async_set_sound_setting(self, name, value): async def async_set_sound_setting(self, name, value):
"""Change a setting on the device.""" """Change a setting on the device."""
await self.dev.set_sound_settings(name, value) _LOGGER.debug("Calling set_sound_setting with %s: %s", name, value)
await self._dev.set_sound_settings(name, value)
async def async_update(self): async def async_update(self):
"""Fetch updates from the device.""" """Fetch updates from the device."""
try: try:
volumes = await self.dev.get_volume_information() if self._sysinfo is None:
self._sysinfo = await self._dev.get_system_info()
if self._model is None:
interface_info = await self._dev.get_interface_information()
self._model = interface_info.modelName
volumes = await self._dev.get_volume_information()
if not volumes: if not volumes:
_LOGGER.error("Got no volume controls, bailing out") _LOGGER.error("Got no volume controls, bailing out")
self._available = False self._available = False
@ -269,11 +254,11 @@ class SongpalDevice(MediaPlayerEntity):
self._volume_control = volume self._volume_control = volume
self._is_muted = self._volume_control.is_muted self._is_muted = self._volume_control.is_muted
status = await self.dev.get_power() status = await self._dev.get_power()
self._state = status.status self._state = status.status
_LOGGER.debug("Got state: %s", status) _LOGGER.debug("Got state: %s", status)
inputs = await self.dev.get_inputs() inputs = await self._dev.get_inputs()
_LOGGER.debug("Got ins: %s", inputs) _LOGGER.debug("Got ins: %s", inputs)
self._sources = OrderedDict() self._sources = OrderedDict()
@ -286,9 +271,6 @@ class SongpalDevice(MediaPlayerEntity):
self._available = True self._available = True
# activate notifications if wanted
if not self._poll:
await self.hass.async_create_task(self.async_activate_websocket())
except SongpalException as ex: except SongpalException as ex:
_LOGGER.error("Unable to update: %s", ex) _LOGGER.error("Unable to update: %s", ex)
self._available = False self._available = False
@ -342,11 +324,11 @@ class SongpalDevice(MediaPlayerEntity):
async def async_turn_on(self): async def async_turn_on(self):
"""Turn the device on.""" """Turn the device on."""
return await self.dev.set_power(True) return await self._dev.set_power(True)
async def async_turn_off(self): async def async_turn_off(self):
"""Turn the device off.""" """Turn the device off."""
return await self.dev.set_power(False) return await self._dev.set_power(False)
async def async_mute_volume(self, mute): async def async_mute_volume(self, mute):
"""Mute or unmute the device.""" """Mute or unmute the device."""

View File

@ -1 +1,104 @@
"""Test the songpal integration.""" """Test the songpal integration."""
from songpal import SongpalException
from homeassistant.components.songpal.const import CONF_ENDPOINT
from homeassistant.const import CONF_NAME
from tests.async_mock import AsyncMock, MagicMock, patch
FRIENDLY_NAME = "name"
ENTITY_ID = f"media_player.{FRIENDLY_NAME}"
HOST = "0.0.0.0"
ENDPOINT = f"http://{HOST}:10000/sony"
MODEL = "model"
MAC = "mac"
SW_VERSION = "sw_ver"
CONF_DATA = {
CONF_NAME: FRIENDLY_NAME,
CONF_ENDPOINT: ENDPOINT,
}
def _create_mocked_device(throw_exception=False):
mocked_device = MagicMock()
type(mocked_device).get_supported_methods = AsyncMock(
side_effect=SongpalException("Unable to do POST request: ")
if throw_exception
else None
)
interface_info = MagicMock()
interface_info.modelName = MODEL
type(mocked_device).get_interface_information = AsyncMock(
return_value=interface_info
)
sys_info = MagicMock()
sys_info.macAddr = MAC
sys_info.version = SW_VERSION
type(mocked_device).get_system_info = AsyncMock(return_value=sys_info)
volume1 = MagicMock()
volume1.maxVolume = 100
volume1.minVolume = 0
volume1.volume = 50
volume1.is_muted = False
volume1.set_volume = AsyncMock()
volume1.set_mute = AsyncMock()
volume2 = MagicMock()
volume2.maxVolume = 100
volume2.minVolume = 0
volume2.volume = 20
volume2.is_muted = True
mocked_device.volume1 = volume1
type(mocked_device).get_volume_information = AsyncMock(
return_value=[volume1, volume2]
)
power = MagicMock()
power.status = True
type(mocked_device).get_power = AsyncMock(return_value=power)
input1 = MagicMock()
input1.title = "title1"
input1.uri = "uri1"
input1.active = False
input1.activate = AsyncMock()
mocked_device.input1 = input1
input2 = MagicMock()
input2.title = "title2"
input2.uri = "uri2"
input2.active = True
type(mocked_device).get_inputs = AsyncMock(return_value=[input1, input2])
type(mocked_device).set_power = AsyncMock()
type(mocked_device).set_sound_settings = AsyncMock()
type(mocked_device).listen_notifications = AsyncMock()
type(mocked_device).stop_listen_notifications = AsyncMock()
notification_callbacks = {}
mocked_device.notification_callbacks = notification_callbacks
def _on_notification(name, callback):
notification_callbacks[name] = callback
type(mocked_device).on_notification = MagicMock(side_effect=_on_notification)
type(mocked_device).clear_notification_callbacks = MagicMock()
return mocked_device
def _patch_config_flow_device(mocked_device):
return patch(
"homeassistant.components.songpal.config_flow.Device",
return_value=mocked_device,
)
def _patch_media_player_device(mocked_device):
return patch(
"homeassistant.components.songpal.media_player.Device",
return_value=mocked_device,
)

View File

@ -1,10 +1,6 @@
"""Test the songpal config flow.""" """Test the songpal config flow."""
import copy import copy
from asynctest import MagicMock, patch
from songpal import SongpalException
from songpal.containers import InterfaceInfo
from homeassistant.components import ssdp from homeassistant.components import ssdp
from homeassistant.components.songpal.const import CONF_ENDPOINT, DOMAIN from homeassistant.components.songpal.const import CONF_ENDPOINT, DOMAIN
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_SSDP, SOURCE_USER from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_SSDP, SOURCE_USER
@ -15,13 +11,20 @@ from homeassistant.data_entry_flow import (
RESULT_TYPE_FORM, RESULT_TYPE_FORM,
) )
from . import (
CONF_DATA,
ENDPOINT,
FRIENDLY_NAME,
HOST,
MODEL,
_create_mocked_device,
_patch_config_flow_device,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
UDN = "uuid:1234" UDN = "uuid:1234"
FRIENDLY_NAME = "friendly name"
HOST = "0.0.0.0"
ENDPOINT = f"http://{HOST}:10000/sony"
MODEL = "model"
SSDP_DATA = { SSDP_DATA = {
ssdp.ATTR_UPNP_UDN: UDN, ssdp.ATTR_UPNP_UDN: UDN,
@ -35,52 +38,6 @@ SSDP_DATA = {
}, },
} }
CONF_DATA = {
CONF_NAME: FRIENDLY_NAME,
CONF_ENDPOINT: ENDPOINT,
}
async def _async_return_value():
pass
def _get_supported_methods(throw_exception):
def get_supported_methods():
if throw_exception:
raise SongpalException("Unable to do POST request: ")
return _async_return_value()
return get_supported_methods
async def _get_interface_information():
return InterfaceInfo(
productName="product name",
modelName=MODEL,
productCategory="product category",
interfaceVersion="interface version",
serverName="server name",
)
def _create_mocked_device(throw_exception=False):
mocked_device = MagicMock()
type(mocked_device).get_supported_methods = MagicMock(
side_effect=_get_supported_methods(throw_exception)
)
type(mocked_device).get_interface_information = MagicMock(
side_effect=_get_interface_information
)
return mocked_device
def _patch_config_flow_device(mocked_device):
return patch(
"homeassistant.components.songpal.config_flow.Device",
return_value=mocked_device,
)
def _flow_next(hass, flow_id): def _flow_next(hass, flow_id):
return next( return next(
@ -90,6 +47,12 @@ def _flow_next(hass, flow_id):
) )
def _patch_setup():
return patch(
"homeassistant.components.songpal.async_setup_entry", return_value=True,
)
async def test_flow_ssdp(hass): async def test_flow_ssdp(hass):
"""Test working ssdp flow.""" """Test working ssdp flow."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@ -104,19 +67,20 @@ async def test_flow_ssdp(hass):
flow = _flow_next(hass, result["flow_id"]) flow = _flow_next(hass, result["flow_id"])
assert flow["context"]["unique_id"] == UDN assert flow["context"]["unique_id"] == UDN
result = await hass.config_entries.flow.async_configure( with _patch_setup():
result["flow_id"], user_input={} result = await hass.config_entries.flow.async_configure(
) result["flow_id"], user_input={}
assert result["type"] == RESULT_TYPE_CREATE_ENTRY )
assert result["title"] == FRIENDLY_NAME assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["data"] == CONF_DATA assert result["title"] == FRIENDLY_NAME
assert result["data"] == CONF_DATA
async def test_flow_user(hass): async def test_flow_user(hass):
"""Test working user initialized flow.""" """Test working user initialized flow."""
mocked_device = _create_mocked_device() mocked_device = _create_mocked_device()
with _patch_config_flow_device(mocked_device): with _patch_config_flow_device(mocked_device), _patch_setup():
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}, DOMAIN, context={"source": SOURCE_USER},
) )
@ -143,7 +107,7 @@ async def test_flow_import(hass):
"""Test working import flow.""" """Test working import flow."""
mocked_device = _create_mocked_device() mocked_device = _create_mocked_device()
with _patch_config_flow_device(mocked_device): with _patch_config_flow_device(mocked_device), _patch_setup():
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data=CONF_DATA DOMAIN, context={"source": SOURCE_IMPORT}, data=CONF_DATA
) )
@ -155,6 +119,22 @@ async def test_flow_import(hass):
mocked_device.get_interface_information.assert_not_called() mocked_device.get_interface_information.assert_not_called()
async def test_flow_import_without_name(hass):
"""Test import flow without optional name."""
mocked_device = _create_mocked_device()
with _patch_config_flow_device(mocked_device), _patch_setup():
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_IMPORT}, data={CONF_ENDPOINT: ENDPOINT}
)
assert result["type"] == RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MODEL
assert result["data"] == {CONF_NAME: MODEL, CONF_ENDPOINT: ENDPOINT}
mocked_device.get_supported_methods.assert_called_once()
mocked_device.get_interface_information.assert_called_once()
def _create_mock_config_entry(hass): def _create_mock_config_entry(hass):
MockConfigEntry(domain=DOMAIN, unique_id="uuid:0000", data=CONF_DATA,).add_to_hass( MockConfigEntry(domain=DOMAIN, unique_id="uuid:0000", data=CONF_DATA,).add_to_hass(
hass hass

View File

@ -0,0 +1,66 @@
"""Tests songpal setup."""
from homeassistant.components import songpal
from homeassistant.setup import async_setup_component
from . import (
CONF_DATA,
_create_mocked_device,
_patch_config_flow_device,
_patch_media_player_device,
)
from tests.async_mock import patch
from tests.common import MockConfigEntry
def _patch_media_setup():
"""Patch media_player.async_setup_entry."""
async def _async_return():
return True
return patch(
"homeassistant.components.songpal.media_player.async_setup_entry",
side_effect=_async_return,
)
async def test_setup_empty(hass):
"""Test setup without any configuration."""
with _patch_media_setup() as setup:
assert await async_setup_component(hass, songpal.DOMAIN, {}) is True
await hass.async_block_till_done()
setup.assert_not_called()
async def test_setup(hass):
"""Test setup the platform."""
mocked_device = _create_mocked_device()
with _patch_config_flow_device(mocked_device), _patch_media_setup() as setup:
assert (
await async_setup_component(
hass, songpal.DOMAIN, {songpal.DOMAIN: [CONF_DATA]}
)
is True
)
await hass.async_block_till_done()
mocked_device.get_supported_methods.assert_called_once()
setup.assert_called_once()
async def test_unload(hass):
"""Test unload entity."""
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
mocked_device = _create_mocked_device()
with _patch_config_flow_device(mocked_device), _patch_media_player_device(
mocked_device
):
assert await async_setup_component(hass, songpal.DOMAIN, {}) is True
await hass.async_block_till_done()
mocked_device.listen_notifications.assert_called_once()
assert await songpal.async_unload_entry(hass, entry)
await hass.async_block_till_done()
mocked_device.stop_listen_notifications.assert_called_once()

View File

@ -0,0 +1,267 @@
"""Test songpal media_player."""
from datetime import timedelta
import logging
from songpal import (
ConnectChange,
ContentChange,
PowerChange,
SongpalException,
VolumeChange,
)
from homeassistant.components import media_player, songpal
from homeassistant.components.songpal.const import SET_SOUND_SETTING
from homeassistant.components.songpal.media_player import SUPPORT_SONGPAL
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from . import (
CONF_DATA,
CONF_ENDPOINT,
CONF_NAME,
ENDPOINT,
ENTITY_ID,
FRIENDLY_NAME,
MAC,
MODEL,
SW_VERSION,
_create_mocked_device,
_patch_media_player_device,
)
from tests.async_mock import AsyncMock, MagicMock, call, patch
from tests.common import MockConfigEntry, async_fire_time_changed
def _get_attributes(hass):
state = hass.states.get(ENTITY_ID)
return state.as_dict()["attributes"]
async def test_setup_platform(hass):
"""Test the legacy setup platform."""
mocked_device = _create_mocked_device(throw_exception=True)
with _patch_media_player_device(mocked_device):
await async_setup_component(
hass,
media_player.DOMAIN,
{
media_player.DOMAIN: [
{
"platform": songpal.DOMAIN,
CONF_NAME: FRIENDLY_NAME,
CONF_ENDPOINT: ENDPOINT,
}
],
},
)
await hass.async_block_till_done()
# No device is set up
mocked_device.assert_not_called()
all_states = hass.states.async_all()
assert len(all_states) == 0
async def test_setup_failed(hass, caplog):
"""Test failed to set up the entity."""
mocked_device = _create_mocked_device(throw_exception=True)
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
with _patch_media_player_device(mocked_device):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 0
warning_records = [x for x in caplog.records if x.levelno == logging.WARNING]
assert len(warning_records) == 2
assert not any(x.levelno == logging.ERROR for x in caplog.records)
caplog.clear()
utcnow = dt_util.utcnow()
type(mocked_device).get_supported_methods = AsyncMock()
with _patch_media_player_device(mocked_device):
async_fire_time_changed(hass, utcnow + timedelta(seconds=30))
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 1
assert not any(x.levelno == logging.WARNING for x in caplog.records)
assert not any(x.levelno == logging.ERROR for x in caplog.records)
async def test_state(hass):
"""Test state of the entity."""
mocked_device = _create_mocked_device()
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
with _patch_media_player_device(mocked_device):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
state = hass.states.get(ENTITY_ID)
assert state.name == FRIENDLY_NAME
assert state.state == STATE_ON
attributes = state.as_dict()["attributes"]
assert attributes["volume_level"] == 0.5
assert attributes["is_volume_muted"] is False
assert attributes["source_list"] == ["title1", "title2"]
assert attributes["source"] == "title2"
assert attributes["supported_features"] == SUPPORT_SONGPAL
device_registry = await dr.async_get_registry(hass)
device = device_registry.async_get_device(
identifiers={(songpal.DOMAIN, MAC)}, connections={}
)
assert device.connections == {(dr.CONNECTION_NETWORK_MAC, MAC)}
assert device.manufacturer == "Sony Corporation"
assert device.name == FRIENDLY_NAME
assert device.sw_version == SW_VERSION
assert device.model == MODEL
entity_registry = await er.async_get_registry(hass)
entity = entity_registry.async_get(ENTITY_ID)
assert entity.unique_id == MAC
async def test_services(hass):
"""Test services."""
mocked_device = _create_mocked_device()
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
with _patch_media_player_device(mocked_device):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
async def _call(service, **argv):
await hass.services.async_call(
media_player.DOMAIN,
service,
{"entity_id": ENTITY_ID, **argv},
blocking=True,
)
await _call(media_player.SERVICE_TURN_ON)
await _call(media_player.SERVICE_TURN_OFF)
await _call(media_player.SERVICE_TOGGLE)
assert mocked_device.set_power.call_count == 3
mocked_device.set_power.assert_has_calls([call(True), call(False), call(False)])
await _call(media_player.SERVICE_VOLUME_SET, volume_level=0.6)
await _call(media_player.SERVICE_VOLUME_UP)
await _call(media_player.SERVICE_VOLUME_DOWN)
assert mocked_device.volume1.set_volume.call_count == 3
mocked_device.volume1.set_volume.assert_has_calls(
[call(60), call("+1"), call("-1")]
)
await _call(media_player.SERVICE_VOLUME_MUTE, is_volume_muted=True)
mocked_device.volume1.set_mute.assert_called_once_with(True)
await _call(media_player.SERVICE_SELECT_SOURCE, source="none")
mocked_device.input1.activate.assert_not_called()
await _call(media_player.SERVICE_SELECT_SOURCE, source="title1")
mocked_device.input1.activate.assert_called_once()
await hass.services.async_call(
songpal.DOMAIN,
SET_SOUND_SETTING,
{"entity_id": ENTITY_ID, "name": "name", "value": "value"},
blocking=True,
)
mocked_device.set_sound_settings.assert_called_once_with("name", "value")
mocked_device.set_sound_settings.reset_mock()
mocked_device2 = _create_mocked_device()
sys_info = MagicMock()
sys_info.macAddr = "mac2"
sys_info.version = SW_VERSION
type(mocked_device2).get_system_info = AsyncMock(return_value=sys_info)
entry2 = MockConfigEntry(
domain=songpal.DOMAIN, data={CONF_NAME: "d2", CONF_ENDPOINT: ENDPOINT}
)
entry2.add_to_hass(hass)
with _patch_media_player_device(mocked_device2):
await hass.config_entries.async_setup(entry2.entry_id)
await hass.async_block_till_done()
await hass.services.async_call(
songpal.DOMAIN,
SET_SOUND_SETTING,
{"entity_id": "all", "name": "name", "value": "value"},
blocking=True,
)
mocked_device.set_sound_settings.assert_called_once_with("name", "value")
mocked_device2.set_sound_settings.assert_called_once_with("name", "value")
async def test_websocket_events(hass):
"""Test websocket events."""
mocked_device = _create_mocked_device()
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
with _patch_media_player_device(mocked_device):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
mocked_device.listen_notifications.assert_called_once()
assert mocked_device.on_notification.call_count == 4
notification_callbacks = mocked_device.notification_callbacks
volume_change = MagicMock()
volume_change.mute = True
volume_change.volume = 20
await notification_callbacks[VolumeChange](volume_change)
attributes = _get_attributes(hass)
assert attributes["is_volume_muted"] is True
assert attributes["volume_level"] == 0.2
content_change = MagicMock()
content_change.is_input = False
content_change.uri = "uri1"
await notification_callbacks[ContentChange](content_change)
assert _get_attributes(hass)["source"] == "title2"
content_change.is_input = True
await notification_callbacks[ContentChange](content_change)
assert _get_attributes(hass)["source"] == "title1"
power_change = MagicMock()
power_change.status = False
await notification_callbacks[PowerChange](power_change)
assert hass.states.get(ENTITY_ID).state == STATE_OFF
async def test_disconnected(hass, caplog):
"""Test disconnected behavior."""
mocked_device = _create_mocked_device()
entry = MockConfigEntry(domain=songpal.DOMAIN, data=CONF_DATA)
entry.add_to_hass(hass)
with _patch_media_player_device(mocked_device):
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
async def _assert_state():
state = hass.states.get(ENTITY_ID)
assert state.state == STATE_UNAVAILABLE
connect_change = MagicMock()
connect_change.exception = "disconnected"
type(mocked_device).get_supported_methods = AsyncMock(
side_effect=[SongpalException(""), SongpalException(""), _assert_state]
)
notification_callbacks = mocked_device.notification_callbacks
with patch("homeassistant.components.songpal.media_player.INITIAL_RETRY_DELAY", 0):
await notification_callbacks[ConnectChange](connect_change)
warning_records = [x for x in caplog.records if x.levelno == logging.WARNING]
assert len(warning_records) == 2
assert warning_records[0].message.endswith("Got disconnected, trying to reconnect.")
assert warning_records[1].message.endswith("Connection reestablished.")
assert not any(x.levelno == logging.ERROR for x in caplog.records)