Add tests for media_player to bluesound integration (#125864)

This commit is contained in:
Louis Christ 2024-10-24 15:42:25 +02:00 committed by GitHub
parent 92e1fa4d3a
commit 93e6c9e5a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 702 additions and 127 deletions

View File

@ -7,7 +7,7 @@ from asyncio import CancelledError, Task
from contextlib import suppress from contextlib import suppress
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from typing import TYPE_CHECKING, Any, NamedTuple, cast from typing import TYPE_CHECKING, Any, NamedTuple
from pyblu import Input, Player, Preset, Status, SyncStatus from pyblu import Input, Player, Preset, Status, SyncStatus
from pyblu.errors import PlayerUnreachableError from pyblu.errors import PlayerUnreachableError
@ -555,6 +555,11 @@ class BluesoundPlayer(MediaPlayerEntity):
"""Return the device name as returned by the device.""" """Return the device name as returned by the device."""
return self._bluesound_device_name return self._bluesound_device_name
@property
def sync_status(self) -> SyncStatus:
"""Return the sync status."""
return self._sync_status
@property @property
def source_list(self) -> list[str] | None: def source_list(self) -> list[str] | None:
"""List of available input sources.""" """List of available input sources."""
@ -693,7 +698,7 @@ class BluesoundPlayer(MediaPlayerEntity):
reverse=True, reverse=True,
) )
return [ return [
cast(str, entity.name) entity.sync_status.name
for entity in sorted_entities for entity in sorted_entities
if entity.bluesound_device_name in device_group if entity.bluesound_device_name in device_group
] ]

View File

@ -1,71 +1,124 @@
"""Common fixtures for the Bluesound tests.""" """Common fixtures for the Bluesound tests."""
from collections.abc import Generator from collections.abc import AsyncGenerator, Generator
from dataclasses import dataclass
import ipaddress
from typing import Any
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
from pyblu import Status, SyncStatus from pyblu import Input, Player, Preset, Status, SyncStatus
import pytest import pytest
from homeassistant.components.bluesound.const import DOMAIN from homeassistant.components.bluesound.const import DOMAIN
from homeassistant.const import CONF_HOST, CONF_PORT from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from .utils import LongPollingMock
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
@pytest.fixture @dataclass
def sync_status() -> SyncStatus: class PlayerMockData:
"""Return a sync status object.""" """Container for player mock data."""
return SyncStatus(
etag="etag", host: str
id="1.1.1.1:11000", player: AsyncMock
mac="00:11:22:33:44:55", status_long_polling_mock: LongPollingMock[Status]
name="player-name", sync_status_long_polling_mock: LongPollingMock[SyncStatus]
image="invalid_url",
initialized=True, @staticmethod
brand="brand", async def generate(host: str) -> "PlayerMockData":
model="model", """Generate player mock data."""
model_name="model-name", host_ip = ipaddress.ip_address(host)
volume_db=0.5, assert host_ip.version == 4
volume=50, mac_parts = [0xFF, 0xFF, *host_ip.packed]
group=None, mac = ":".join(f"{x:02X}" for x in mac_parts)
master=None,
slaves=None, player_name = f"player-name{host.replace('.', '')}"
zone=None,
zone_master=None, player = await AsyncMock(spec=Player)()
zone_slave=None, player.__aenter__.return_value = player
mute_volume_db=None,
mute_volume=None, status_long_polling_mock = LongPollingMock(
) Status(
etag="etag",
input_id=None,
service=None,
state="play",
shuffle=False,
album="album",
artist="artist",
name="song",
image=None,
volume=10,
volume_db=22.3,
mute=False,
mute_volume=None,
mute_volume_db=None,
seconds=2,
total_seconds=123.1,
can_seek=False,
sleep=0,
group_name=None,
group_volume=None,
indexing=False,
stream_url=None,
)
)
sync_status_long_polling_mock = LongPollingMock(
SyncStatus(
etag="etag",
id=f"{host}:11000",
mac=mac,
name=player_name,
image="invalid_url",
initialized=True,
brand="brand",
model="model",
model_name="model-name",
volume_db=0.5,
volume=50,
group=None,
master=None,
slaves=None,
zone=None,
zone_master=None,
zone_slave=None,
mute_volume_db=None,
mute_volume=None,
)
)
player.status.side_effect = status_long_polling_mock.side_effect()
player.sync_status.side_effect = sync_status_long_polling_mock.side_effect()
player.inputs = AsyncMock(
return_value=[
Input("1", "input1", "image1", "url1"),
Input("2", "input2", "image2", "url2"),
]
)
player.presets = AsyncMock(
return_value=[
Preset("preset1", "1", "url1", "image1", None),
Preset("preset2", "2", "url2", "image2", None),
]
)
return PlayerMockData(
host, player, status_long_polling_mock, sync_status_long_polling_mock
)
@pytest.fixture @dataclass
def status() -> Status: class PlayerMocks:
"""Return a status object.""" """Container for mocks."""
return Status(
etag="etag", player_data: PlayerMockData
input_id=None, player_data_secondary: PlayerMockData
service=None, player_data_for_already_configured: PlayerMockData
state="playing",
shuffle=False,
album=None,
artist=None,
name=None,
image=None,
volume=10,
volume_db=22.3,
mute=False,
mute_volume=None,
mute_volume_db=None,
seconds=2,
total_seconds=123.1,
can_seek=False,
sleep=0,
group_name=None,
group_volume=None,
indexing=False,
stream_url=None,
)
@pytest.fixture @pytest.fixture
@ -78,24 +131,76 @@ def mock_setup_entry() -> Generator[AsyncMock]:
@pytest.fixture @pytest.fixture
def mock_config_entry(hass: HomeAssistant) -> MockConfigEntry: def config_entry() -> MockConfigEntry:
"""Return a mocked config entry.""" """Return a mocked config entry."""
mock_entry = MockConfigEntry( return MockConfigEntry(
domain=DOMAIN, domain=DOMAIN,
data={ data={
CONF_HOST: "1.1.1.2", CONF_HOST: "1.1.1.1",
CONF_PORT: 11000, CONF_PORT: 11000,
}, },
unique_id="00:11:22:33:44:55-11000", unique_id="ff:ff:01:01:01:01-11000",
) )
mock_entry.add_to_hass(hass)
return mock_entry
@pytest.fixture @pytest.fixture
def mock_player(status: Status) -> Generator[AsyncMock]: def config_entry_secondary() -> MockConfigEntry:
"""Return a mocked config entry."""
return MockConfigEntry(
domain=DOMAIN,
data={
CONF_HOST: "2.2.2.2",
CONF_PORT: 11000,
},
unique_id="ff:ff:02:02:02:02-11000",
)
@pytest.fixture
async def setup_config_entry(
hass: HomeAssistant, config_entry: MockConfigEntry, player_mocks: PlayerMocks
) -> None:
"""Set up the platform."""
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
@pytest.fixture
async def setup_config_entry_secondary(
hass: HomeAssistant,
config_entry_secondary: MockConfigEntry,
player_mocks: PlayerMocks,
) -> None:
"""Set up the platform."""
config_entry_secondary.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry_secondary.entry_id)
await hass.async_block_till_done()
@pytest.fixture
async def player_mocks() -> AsyncGenerator[PlayerMocks]:
"""Mock the player.""" """Mock the player."""
player_mocks = PlayerMocks(
player_data=await PlayerMockData.generate("1.1.1.1"),
player_data_secondary=await PlayerMockData.generate("2.2.2.2"),
player_data_for_already_configured=await PlayerMockData.generate("1.1.1.2"),
)
# to simulate a player that is already configured
player_mocks.player_data_for_already_configured.sync_status_long_polling_mock.get().mac = player_mocks.player_data.sync_status_long_polling_mock.get().mac
def select_player(*args: Any, **kwargs: Any) -> AsyncMock:
match args[0]:
case "1.1.1.1":
return player_mocks.player_data.player
case "2.2.2.2":
return player_mocks.player_data_secondary.player
case "1.1.1.2":
return player_mocks.player_data_for_already_configured.player
case _:
raise ValueError("Invalid player")
with ( with (
patch( patch(
"homeassistant.components.bluesound.Player", autospec=True "homeassistant.components.bluesound.Player", autospec=True
@ -105,28 +210,6 @@ def mock_player(status: Status) -> Generator[AsyncMock]:
new=mock_player, new=mock_player,
), ),
): ):
player = mock_player.return_value mock_player.side_effect = select_player
player.__aenter__.return_value = player
player.status.return_value = status yield player_mocks
player.sync_status.return_value = SyncStatus(
etag="etag",
id="1.1.1.1:11000",
mac="00:11:22:33:44:55",
name="player-name",
image="invalid_url",
initialized=True,
brand="brand",
model="model",
model_name="model-name",
volume_db=0.5,
volume=50,
group=None,
master=None,
slaves=None,
zone=None,
zone_master=None,
zone_slave=None,
mute_volume_db=None,
mute_volume=None,
)
yield player

View File

@ -0,0 +1,31 @@
# serializer version: 1
# name: test_attributes_set
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'player-name1111',
'is_volume_muted': False,
'master': False,
'media_album_name': 'album',
'media_artist': 'artist',
'media_content_type': <MediaType.MUSIC: 'music'>,
'media_duration': 123,
'media_position': 2,
'media_title': 'song',
'shuffle': False,
'source_list': list([
'input1',
'input2',
'preset1',
'preset2',
]),
'supported_features': <MediaPlayerEntityFeature: 196157>,
'volume_level': 0.1,
}),
'context': <ANY>,
'entity_id': 'media_player.player_name1111',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'playing',
})
# ---

View File

@ -11,11 +11,13 @@ from homeassistant.const import CONF_HOST, CONF_PORT
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType from homeassistant.data_entry_flow import FlowResultType
from .conftest import PlayerMocks
from tests.common import MockConfigEntry from tests.common import MockConfigEntry
async def test_user_flow_success( async def test_user_flow_success(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_player: AsyncMock hass: HomeAssistant, mock_setup_entry: AsyncMock, player_mocks: PlayerMocks
) -> None: ) -> None:
"""Test we get the form.""" """Test we get the form."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@ -33,15 +35,17 @@ async def test_user_flow_success(
) )
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "player-name" assert result["title"] == "player-name1111"
assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000} assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000}
assert result["result"].unique_id == "00:11:22:33:44:55-11000" assert result["result"].unique_id == "ff:ff:01:01:01:01-11000"
mock_setup_entry.assert_called_once() mock_setup_entry.assert_called_once()
async def test_user_flow_cannot_connect( async def test_user_flow_cannot_connect(
hass: HomeAssistant, mock_player: AsyncMock, mock_setup_entry: AsyncMock hass: HomeAssistant,
player_mocks: PlayerMocks,
mock_setup_entry: AsyncMock,
) -> None: ) -> None:
"""Test we handle cannot connect error.""" """Test we handle cannot connect error."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@ -49,7 +53,9 @@ async def test_user_flow_cannot_connect(
context={"source": SOURCE_USER}, context={"source": SOURCE_USER},
) )
mock_player.sync_status.side_effect = PlayerUnreachableError("Player not reachable") player_mocks.player_data.sync_status_long_polling_mock.set_error(
PlayerUnreachableError("Player not reachable")
)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
{ {
@ -61,7 +67,7 @@ async def test_user_flow_cannot_connect(
assert result["errors"] == {"base": "cannot_connect"} assert result["errors"] == {"base": "cannot_connect"}
assert result["step_id"] == "user" assert result["step_id"] == "user"
mock_player.sync_status.side_effect = None player_mocks.player_data.sync_status_long_polling_mock.set_error(None)
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
{ {
@ -70,7 +76,7 @@ async def test_user_flow_cannot_connect(
) )
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "player-name" assert result["title"] == "player-name1111"
assert result["data"] == { assert result["data"] == {
CONF_HOST: "1.1.1.1", CONF_HOST: "1.1.1.1",
CONF_PORT: 11000, CONF_PORT: 11000,
@ -81,10 +87,11 @@ async def test_user_flow_cannot_connect(
async def test_user_flow_aleady_configured( async def test_user_flow_aleady_configured(
hass: HomeAssistant, hass: HomeAssistant,
mock_player: AsyncMock, player_mocks: PlayerMocks,
mock_config_entry: MockConfigEntry, config_entry: MockConfigEntry,
) -> None: ) -> None:
"""Test we handle already configured.""" """Test we handle already configured."""
config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_USER}, context={"source": SOURCE_USER},
@ -93,7 +100,7 @@ async def test_user_flow_aleady_configured(
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], result["flow_id"],
{ {
CONF_HOST: "1.1.1.1", CONF_HOST: "1.1.1.2",
CONF_PORT: 11000, CONF_PORT: 11000,
}, },
) )
@ -101,13 +108,13 @@ async def test_user_flow_aleady_configured(
assert result["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured" assert result["reason"] == "already_configured"
assert mock_config_entry.data[CONF_HOST] == "1.1.1.1" assert config_entry.data[CONF_HOST] == "1.1.1.2"
mock_player.sync_status.assert_called_once() player_mocks.player_data_for_already_configured.player.sync_status.assert_called_once()
async def test_import_flow_success( async def test_import_flow_success(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_player: AsyncMock hass: HomeAssistant, mock_setup_entry: AsyncMock, player_mocks: PlayerMocks
) -> None: ) -> None:
"""Test we get the form.""" """Test we get the form."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@ -117,19 +124,21 @@ async def test_import_flow_success(
) )
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "player-name" assert result["title"] == "player-name1111"
assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000} assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000}
assert result["result"].unique_id == "00:11:22:33:44:55-11000" assert result["result"].unique_id == "ff:ff:01:01:01:01-11000"
mock_setup_entry.assert_called_once() mock_setup_entry.assert_called_once()
mock_player.sync_status.assert_called_once() player_mocks.player_data.player.sync_status.assert_called_once()
async def test_import_flow_cannot_connect( async def test_import_flow_cannot_connect(
hass: HomeAssistant, mock_player: AsyncMock hass: HomeAssistant, player_mocks: PlayerMocks
) -> None: ) -> None:
"""Test we handle cannot connect error.""" """Test we handle cannot connect error."""
mock_player.sync_status.side_effect = PlayerUnreachableError("Player not reachable") player_mocks.player_data.player.sync_status.side_effect = PlayerUnreachableError(
"Player not reachable"
)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_IMPORT}, context={"source": SOURCE_IMPORT},
@ -139,29 +148,30 @@ async def test_import_flow_cannot_connect(
assert result["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "cannot_connect" assert result["reason"] == "cannot_connect"
mock_player.sync_status.assert_called_once() player_mocks.player_data.player.sync_status.assert_called_once()
async def test_import_flow_already_configured( async def test_import_flow_already_configured(
hass: HomeAssistant, hass: HomeAssistant,
mock_player: AsyncMock, player_mocks: PlayerMocks,
mock_config_entry: MockConfigEntry, config_entry: MockConfigEntry,
) -> None: ) -> None:
"""Test we handle already configured.""" """Test we handle already configured."""
config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_IMPORT}, context={"source": SOURCE_IMPORT},
data={CONF_HOST: "1.1.1.1", CONF_PORT: 11000}, data={CONF_HOST: "1.1.1.2", CONF_PORT: 11000},
) )
assert result["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured" assert result["reason"] == "already_configured"
mock_player.sync_status.assert_called_once() player_mocks.player_data_for_already_configured.player.sync_status.assert_called_once()
async def test_zeroconf_flow_success( async def test_zeroconf_flow_success(
hass: HomeAssistant, mock_setup_entry: AsyncMock, mock_player: AsyncMock hass: HomeAssistant, mock_setup_entry: AsyncMock, player_mocks: PlayerMocks
) -> None: ) -> None:
"""Test we get the form.""" """Test we get the form."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@ -171,7 +181,7 @@ async def test_zeroconf_flow_success(
ip_address="1.1.1.1", ip_address="1.1.1.1",
ip_addresses=["1.1.1.1"], ip_addresses=["1.1.1.1"],
port=11000, port=11000,
hostname="player-name", hostname="player-name1111",
type="_musc._tcp.local.", type="_musc._tcp.local.",
name="player-name._musc._tcp.local.", name="player-name._musc._tcp.local.",
properties={}, properties={},
@ -182,25 +192,27 @@ async def test_zeroconf_flow_success(
assert result["step_id"] == "confirm" assert result["step_id"] == "confirm"
mock_setup_entry.assert_not_called() mock_setup_entry.assert_not_called()
mock_player.sync_status.assert_called_once() player_mocks.player_data.player.sync_status.assert_called_once()
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={} result["flow_id"], user_input={}
) )
assert result["type"] is FlowResultType.CREATE_ENTRY assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "player-name" assert result["title"] == "player-name1111"
assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000} assert result["data"] == {CONF_HOST: "1.1.1.1", CONF_PORT: 11000}
assert result["result"].unique_id == "00:11:22:33:44:55-11000" assert result["result"].unique_id == "ff:ff:01:01:01:01-11000"
mock_setup_entry.assert_called_once() mock_setup_entry.assert_called_once()
async def test_zeroconf_flow_cannot_connect( async def test_zeroconf_flow_cannot_connect(
hass: HomeAssistant, mock_player: AsyncMock hass: HomeAssistant, player_mocks: PlayerMocks
) -> None: ) -> None:
"""Test we handle cannot connect error.""" """Test we handle cannot connect error."""
mock_player.sync_status.side_effect = PlayerUnreachableError("Player not reachable") player_mocks.player_data.player.sync_status.side_effect = PlayerUnreachableError(
"Player not reachable"
)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
@ -208,7 +220,7 @@ async def test_zeroconf_flow_cannot_connect(
ip_address="1.1.1.1", ip_address="1.1.1.1",
ip_addresses=["1.1.1.1"], ip_addresses=["1.1.1.1"],
port=11000, port=11000,
hostname="player-name", hostname="player-name1111",
type="_musc._tcp.local.", type="_musc._tcp.local.",
name="player-name._musc._tcp.local.", name="player-name._musc._tcp.local.",
properties={}, properties={},
@ -218,23 +230,24 @@ async def test_zeroconf_flow_cannot_connect(
assert result["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "cannot_connect" assert result["reason"] == "cannot_connect"
mock_player.sync_status.assert_called_once() player_mocks.player_data.player.sync_status.assert_called_once()
async def test_zeroconf_flow_already_configured( async def test_zeroconf_flow_already_configured(
hass: HomeAssistant, hass: HomeAssistant,
mock_player: AsyncMock, player_mocks: PlayerMocks,
mock_config_entry: MockConfigEntry, config_entry: MockConfigEntry,
) -> None: ) -> None:
"""Test we handle already configured and update the host.""" """Test we handle already configured and update the host."""
config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DOMAIN, DOMAIN,
context={"source": SOURCE_ZEROCONF}, context={"source": SOURCE_ZEROCONF},
data=ZeroconfServiceInfo( data=ZeroconfServiceInfo(
ip_address="1.1.1.1", ip_address="1.1.1.2",
ip_addresses=["1.1.1.1"], ip_addresses=["1.1.1.2"],
port=11000, port=11000,
hostname="player-name", hostname="player-name1112",
type="_musc._tcp.local.", type="_musc._tcp.local.",
name="player-name._musc._tcp.local.", name="player-name._musc._tcp.local.",
properties={}, properties={},
@ -244,6 +257,6 @@ async def test_zeroconf_flow_already_configured(
assert result["type"] is FlowResultType.ABORT assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured" assert result["reason"] == "already_configured"
assert mock_config_entry.data[CONF_HOST] == "1.1.1.1" assert config_entry.data[CONF_HOST] == "1.1.1.2"
mock_player.sync_status.assert_called_once() player_mocks.player_data_for_already_configured.player.sync_status.assert_called_once()

View File

@ -0,0 +1,46 @@
"""Test bluesound integration."""
from pyblu.errors import PlayerUnreachableError
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from .conftest import PlayerMocks
from tests.common import MockConfigEntry
async def test_setup_entry(
hass: HomeAssistant, setup_config_entry: None, config_entry: MockConfigEntry
) -> None:
"""Test a successful setup entry."""
assert hass.states.get("media_player.player_name1111").state == "playing"
assert config_entry.state is ConfigEntryState.LOADED
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get("media_player.player_name1111").state == "unavailable"
assert config_entry.state is ConfigEntryState.NOT_LOADED
async def test_unload_entry_while_player_is_offline(
hass: HomeAssistant,
setup_config_entry: None,
config_entry: MockConfigEntry,
player_mocks: PlayerMocks,
) -> None:
"""Test entries can be unloaded correctly while the player is offline."""
player_mocks.player_data.player.status.side_effect = PlayerUnreachableError(
"Player not reachable"
)
player_mocks.player_data.status_long_polling_mock.trigger()
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get("media_player.player_name1111").state == "unavailable"
assert config_entry.state is ConfigEntryState.NOT_LOADED

View File

@ -0,0 +1,327 @@
"""Tests for the Bluesound Media Player platform."""
import dataclasses
from unittest.mock import call
from pyblu import PairedPlayer
from pyblu.errors import PlayerUnreachableError
import pytest
from syrupy.assertion import SnapshotAssertion
from syrupy.filters import props
from homeassistant.components.bluesound import DOMAIN as BLUESOUND_DOMAIN
from homeassistant.components.bluesound.const import (
ATTR_MASTER,
SERVICE_CLEAR_TIMER,
SERVICE_JOIN,
SERVICE_SET_TIMER,
)
from homeassistant.components.media_player import (
ATTR_MEDIA_VOLUME_LEVEL,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_MEDIA_NEXT_TRACK,
SERVICE_MEDIA_PAUSE,
SERVICE_MEDIA_PLAY,
SERVICE_MEDIA_PREVIOUS_TRACK,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_MUTE,
SERVICE_VOLUME_SET,
SERVICE_VOLUME_UP,
MediaPlayerState,
)
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ServiceValidationError
from .conftest import PlayerMocks
@pytest.mark.parametrize(
("service", "method"),
[
(SERVICE_MEDIA_PAUSE, "pause"),
(SERVICE_MEDIA_PLAY, "play"),
(SERVICE_MEDIA_NEXT_TRACK, "skip"),
(SERVICE_MEDIA_PREVIOUS_TRACK, "back"),
],
)
async def test_simple_actions(
hass: HomeAssistant,
setup_config_entry: None,
player_mocks: PlayerMocks,
service: str,
method: str,
) -> None:
"""Test the media player simple actions."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
service,
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
getattr(player_mocks.player_data.player, method).assert_called_once_with()
async def test_volume_set(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the media player volume set."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: "media_player.player_name1111", ATTR_MEDIA_VOLUME_LEVEL: 0.5},
blocking=True,
)
player_mocks.player_data.player.volume.assert_called_once_with(level=50)
async def test_volume_mute(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the media player volume mute."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_VOLUME_MUTE,
{ATTR_ENTITY_ID: "media_player.player_name1111", "is_volume_muted": True},
blocking=True,
)
player_mocks.player_data.player.volume.assert_called_once_with(mute=True)
async def test_volume_up(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the media player volume up."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_VOLUME_UP,
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
player_mocks.player_data.player.volume.assert_called_once_with(level=11)
async def test_volume_down(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the media player volume down."""
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_VOLUME_DOWN,
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
player_mocks.player_data.player.volume.assert_called_once_with(level=9)
async def test_attributes_set(
hass: HomeAssistant,
setup_config_entry: None,
player_mocks: PlayerMocks,
snapshot: SnapshotAssertion,
) -> None:
"""Test the media player attributes set."""
state = hass.states.get("media_player.player_name1111")
assert state == snapshot(exclude=props("media_position_updated_at"))
async def test_status_updated(
hass: HomeAssistant,
setup_config_entry: None,
player_mocks: PlayerMocks,
) -> None:
"""Test the media player status updated."""
pre_state = hass.states.get("media_player.player_name1111")
assert pre_state.state == "playing"
assert pre_state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.1
status = player_mocks.player_data.status_long_polling_mock.get()
status = dataclasses.replace(status, state="pause", volume=50, etag="changed")
player_mocks.player_data.status_long_polling_mock.set(status)
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
post_state = hass.states.get("media_player.player_name1111")
assert post_state.state == MediaPlayerState.PAUSED
assert post_state.attributes[ATTR_MEDIA_VOLUME_LEVEL] == 0.5
async def test_unavailable_when_offline(
hass: HomeAssistant,
setup_config_entry: None,
player_mocks: PlayerMocks,
) -> None:
"""Test that the media player goes unavailable when the player is unreachable."""
pre_state = hass.states.get("media_player.player_name1111")
assert pre_state.state == "playing"
player_mocks.player_data.status_long_polling_mock.set_error(
PlayerUnreachableError("Player not reachable")
)
player_mocks.player_data.status_long_polling_mock.trigger()
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
post_state = hass.states.get("media_player.player_name1111")
assert post_state.state == STATE_UNAVAILABLE
async def test_set_sleep_timer(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the set sleep timer action."""
await hass.services.async_call(
BLUESOUND_DOMAIN,
SERVICE_SET_TIMER,
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
player_mocks.player_data.player.sleep_timer.assert_called_once()
async def test_clear_sleep_timer(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test the clear sleep timer action."""
player_mocks.player_data.player.sleep_timer.side_effect = [15, 30, 45, 60, 90, 0]
await hass.services.async_call(
BLUESOUND_DOMAIN,
SERVICE_CLEAR_TIMER,
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
player_mocks.player_data.player.sleep_timer.assert_has_calls([call()] * 6)
async def test_join_cannot_join_to_self(
hass: HomeAssistant, setup_config_entry: None, player_mocks: PlayerMocks
) -> None:
"""Test that joining to self is not allowed."""
with pytest.raises(ServiceValidationError, match="Cannot join player to itself"):
await hass.services.async_call(
BLUESOUND_DOMAIN,
SERVICE_JOIN,
{
ATTR_ENTITY_ID: "media_player.player_name1111",
ATTR_MASTER: "media_player.player_name1111",
},
blocking=True,
)
async def test_join(
hass: HomeAssistant,
setup_config_entry: None,
setup_config_entry_secondary: None,
player_mocks: PlayerMocks,
) -> None:
"""Test the join action."""
await hass.services.async_call(
BLUESOUND_DOMAIN,
SERVICE_JOIN,
{
ATTR_ENTITY_ID: "media_player.player_name1111",
ATTR_MASTER: "media_player.player_name2222",
},
blocking=True,
)
player_mocks.player_data_secondary.player.add_slave.assert_called_once_with(
"1.1.1.1", 11000
)
async def test_unjoin(
hass: HomeAssistant,
setup_config_entry: None,
setup_config_entry_secondary: None,
player_mocks: PlayerMocks,
) -> None:
"""Test the unjoin action."""
updated_sync_status = dataclasses.replace(
player_mocks.player_data.sync_status_long_polling_mock.get(),
master=PairedPlayer("2.2.2.2", 11000),
)
player_mocks.player_data.sync_status_long_polling_mock.set(updated_sync_status)
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
await hass.services.async_call(
BLUESOUND_DOMAIN,
"unjoin",
{ATTR_ENTITY_ID: "media_player.player_name1111"},
blocking=True,
)
player_mocks.player_data_secondary.player.remove_slave.assert_called_once_with(
"1.1.1.1", 11000
)
async def test_attr_master(
hass: HomeAssistant,
setup_config_entry: None,
player_mocks: PlayerMocks,
) -> None:
"""Test the media player master."""
attr_master = hass.states.get("media_player.player_name1111").attributes[
ATTR_MASTER
]
assert attr_master is False
updated_sync_status = dataclasses.replace(
player_mocks.player_data.sync_status_long_polling_mock.get(),
slaves=[PairedPlayer("2.2.2.2", 11000)],
)
player_mocks.player_data.sync_status_long_polling_mock.set(updated_sync_status)
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
attr_master = hass.states.get("media_player.player_name1111").attributes[
ATTR_MASTER
]
assert attr_master is True
async def test_attr_bluesound_group(
hass: HomeAssistant,
setup_config_entry: None,
setup_config_entry_secondary: None,
player_mocks: PlayerMocks,
) -> None:
"""Test the media player grouping."""
attr_bluesound_group = hass.states.get(
"media_player.player_name1111"
).attributes.get("bluesound_group")
assert attr_bluesound_group is None
updated_status = dataclasses.replace(
player_mocks.player_data.status_long_polling_mock.get(),
group_name="player-name1111+player-name2222",
)
player_mocks.player_data.status_long_polling_mock.set(updated_status)
# give the long polling loop a chance to update the state; this could be any async call
await hass.async_block_till_done()
attr_bluesound_group = hass.states.get(
"media_player.player_name1111"
).attributes.get("bluesound_group")
assert attr_bluesound_group == ["player-name1111", "player-name2222"]

View File

@ -0,0 +1,70 @@
"""Utils for bluesound tests."""
import asyncio
from typing import Protocol
class Etag(Protocol):
"""Etag protocol."""
etag: str
class LongPollingMock[T: Etag]:
"""Mock long polling methods(status, sync_status)."""
def __init__(self, value: T) -> None:
"""Store value and allows to wait for changes."""
self._value = value
self._error: Exception | None = None
self._event = asyncio.Event()
self._event.set()
def trigger(self):
"""Trigger the event without changing the value."""
self._event.set()
def set(self, value: T):
"""Set the value and notify all waiting."""
self._value = value
self._event.set()
def set_error(self, error: Exception | None):
"""Set the error and notify all waiting."""
self._error = error
self._event.set()
def get(self) -> T:
"""Get the value without waiting."""
return self._value
async def wait(self) -> T:
"""Wait for the value or error to change."""
await self._event.wait()
self._event.clear()
return self._value
def side_effect(self):
"""Return the side_effect for mocking."""
last_etag = None
async def mock(*args, **kwargs) -> T:
nonlocal last_etag
if self._error is not None:
raise self._error
etag = kwargs.get("etag")
if etag is None or etag != last_etag:
last_etag = self.get().etag
return self.get()
value = await self.wait()
last_etag = value.etag
if self._error is not None:
raise self._error
return value
return mock