Add battery support for Sonos speakers (#49441)

Co-authored-by: Walter Huf <hufman@gmail.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
jjlawren 2021-04-25 12:20:21 -05:00 committed by GitHub
parent 7ecd4f5eed
commit 3be8c9c1c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 886 additions and 292 deletions

View File

@ -1,16 +1,46 @@
"""Support to embed Sonos."""
from __future__ import annotations
import asyncio
import datetime
from functools import partial
import logging
import socket
import pysonos
from pysonos import events_asyncio
from pysonos.core import SoCo
from pysonos.exceptions import SoCoException
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.const import CONF_HOSTS
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOSTS,
EVENT_HOMEASSISTANT_START,
EVENT_HOMEASSISTANT_STOP,
)
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from .const import DOMAIN
from .const import (
DATA_SONOS,
DISCOVERY_INTERVAL,
DOMAIN,
PLATFORMS,
SONOS_GROUP_UPDATE,
SONOS_SEEN,
)
from .speaker import SonosSpeaker
_LOGGER = logging.getLogger(__name__)
CONF_ADVERTISE_ADDR = "advertise_addr"
CONF_INTERFACE_ADDR = "interface_addr"
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
@ -31,6 +61,19 @@ CONFIG_SCHEMA = vol.Schema(
)
class SonosData:
"""Storage class for platform global data."""
def __init__(self):
"""Initialize the data."""
self.discovered = {}
self.media_player_entities = {}
self.topology_condition = asyncio.Condition()
self.discovery_thread = None
self.hosts_heartbeat = None
self.platforms_ready = set()
async def async_setup(hass, config):
"""Set up the Sonos component."""
conf = config.get(DOMAIN)
@ -47,9 +90,103 @@ async def async_setup(hass, config):
return True
async def async_setup_entry(hass, entry):
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Set up Sonos from a config entry."""
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, MP_DOMAIN)
)
pysonos.config.EVENTS_MODULE = events_asyncio
if DATA_SONOS not in hass.data:
hass.data[DATA_SONOS] = SonosData()
config = hass.data[DOMAIN].get("media_player", {})
_LOGGER.debug("Reached async_setup_entry, config=%s", config)
advertise_addr = config.get(CONF_ADVERTISE_ADDR)
if advertise_addr:
pysonos.config.EVENT_ADVERTISE_IP = advertise_addr
def _stop_discovery(event: Event) -> None:
data = hass.data[DATA_SONOS]
if data.discovery_thread:
data.discovery_thread.stop()
data.discovery_thread = None
if data.hosts_heartbeat:
data.hosts_heartbeat()
data.hosts_heartbeat = None
def _discovery(now: datetime.datetime | None = None) -> None:
"""Discover players from network or configuration."""
hosts = config.get(CONF_HOSTS)
def _discovered_player(soco: SoCo) -> None:
"""Handle a (re)discovered player."""
try:
_LOGGER.debug("Reached _discovered_player, soco=%s", soco)
data = hass.data[DATA_SONOS]
if soco.uid not in data.discovered:
_LOGGER.debug("Adding new speaker")
speaker_info = soco.get_speaker_info(True)
speaker = SonosSpeaker(hass, soco, speaker_info)
data.discovered[soco.uid] = speaker
speaker.setup()
else:
dispatcher_send(hass, f"{SONOS_SEEN}-{soco.uid}", soco)
except SoCoException as ex:
_LOGGER.debug("SoCoException, ex=%s", ex)
if hosts:
for host in hosts:
try:
_LOGGER.debug("Testing %s", host)
player = pysonos.SoCo(socket.gethostbyname(host))
if player.is_visible:
# Make sure that the player is available
_ = player.volume
_discovered_player(player)
except (OSError, SoCoException) as ex:
_LOGGER.debug("Exception %s", ex)
if now is None:
_LOGGER.warning("Failed to initialize '%s'", host)
_LOGGER.debug("Tested all hosts")
hass.data[DATA_SONOS].hosts_heartbeat = hass.helpers.event.call_later(
DISCOVERY_INTERVAL.total_seconds(), _discovery
)
else:
_LOGGER.debug("Starting discovery thread")
hass.data[DATA_SONOS].discovery_thread = pysonos.discover_thread(
_discovered_player,
interval=DISCOVERY_INTERVAL.total_seconds(),
interface_addr=config.get(CONF_INTERFACE_ADDR),
)
hass.data[DATA_SONOS].discovery_thread.name = "Sonos-Discovery"
@callback
def _async_signal_update_groups(event):
async_dispatcher_send(hass, SONOS_GROUP_UPDATE)
@callback
def start_discovery():
_LOGGER.debug("Adding discovery job")
hass.async_add_executor_job(_discovery)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_discovery)
hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_START, _async_signal_update_groups
)
@callback
def platform_ready(platform, _):
hass.data[DATA_SONOS].platforms_ready.add(platform)
if hass.data[DATA_SONOS].platforms_ready == PLATFORMS:
start_discovery()
for platform in PLATFORMS:
task = hass.async_create_task(
hass.config_entries.async_forward_entry_setup(entry, platform)
)
task.add_done_callback(partial(platform_ready, platform))
return True

View File

@ -1,4 +1,7 @@
"""Const for Sonos."""
import datetime
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.media_player.const import (
MEDIA_CLASS_ALBUM,
MEDIA_CLASS_ARTIST,
@ -15,9 +18,11 @@ from homeassistant.components.media_player.const import (
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_TRACK,
)
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
DOMAIN = "sonos"
DATA_SONOS = "sonos_media_player"
PLATFORMS = {MP_DOMAIN, SENSOR_DOMAIN}
SONOS_ARTIST = "artists"
SONOS_ALBUM = "albums"
@ -121,3 +126,20 @@ PLAYABLE_MEDIA_TYPES = [
MEDIA_TYPE_PLAYLIST,
MEDIA_TYPE_TRACK,
]
SONOS_CONTENT_UPDATE = "sonos_content_update"
SONOS_DISCOVERY_UPDATE = "sonos_discovery_update"
SONOS_ENTITY_CREATED = "sonos_entity_created"
SONOS_ENTITY_UPDATE = "sonos_entity_update"
SONOS_GROUP_UPDATE = "sonos_group_update"
SONOS_MEDIA_UPDATE = "sonos_media_update"
SONOS_PROPERTIES_UPDATE = "sonos_properties_update"
SONOS_PLAYER_RECONNECTED = "sonos_player_reconnected"
SONOS_STATE_UPDATED = "sonos_state_updated"
SONOS_VOLUME_UPDATE = "sonos_properties_update"
SONOS_SEEN = "sonos_seen"
BATTERY_SCAN_INTERVAL = datetime.timedelta(minutes=15)
SCAN_INTERVAL = datetime.timedelta(seconds=10)
DISCOVERY_INTERVAL = datetime.timedelta(seconds=60)
SEEN_EXPIRE_TIME = 3.5 * DISCOVERY_INTERVAL

View File

@ -0,0 +1,79 @@
"""Entity representing a Sonos player."""
from __future__ import annotations
import logging
from typing import Any
from pysonos.core import SoCo
from homeassistant.core import callback
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from . import SonosData
from .const import DOMAIN, SONOS_ENTITY_UPDATE, SONOS_STATE_UPDATED
from .speaker import SonosSpeaker
_LOGGER = logging.getLogger(__name__)
class SonosEntity(Entity):
"""Representation of a Sonos entity."""
def __init__(self, speaker: SonosSpeaker, sonos_data: SonosData):
"""Initialize a SonosEntity."""
self.speaker = speaker
self.data = sonos_data
async def async_added_to_hass(self) -> None:
"""Handle common setup when added to hass."""
await self.speaker.async_seen()
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_ENTITY_UPDATE}-{self.soco.uid}",
self.async_update, # pylint: disable=no-member
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_STATE_UPDATED}-{self.soco.uid}",
self.async_write_state,
)
)
@property
def soco(self) -> SoCo:
"""Return the speaker SoCo instance."""
return self.speaker.soco
@property
def device_info(self) -> dict[str, Any]:
"""Return information about the device."""
return {
"identifiers": {(DOMAIN, self.soco.uid)},
"name": self.speaker.zone_name,
"model": self.speaker.model_name.replace("Sonos ", ""),
"sw_version": self.speaker.version,
"connections": {(dr.CONNECTION_NETWORK_MAC, self.speaker.mac_address)},
"manufacturer": "Sonos",
"suggested_area": self.speaker.zone_name,
}
@property
def available(self) -> bool:
"""Return whether this device is available."""
return self.speaker.available
@property
def should_poll(self) -> bool:
"""Return that we should not be polled (we handle that internally)."""
return False
@callback
def async_write_state(self) -> None:
"""Flush the current entity state."""
self.async_write_ha_state()

View File

@ -7,13 +7,11 @@ from contextlib import suppress
import datetime
import functools as ft
import logging
import socket
from typing import Any, Callable
import urllib.parse
import async_timeout
import pysonos
from pysonos import alarms, events_asyncio
from pysonos import alarms
from pysonos.core import (
MUSIC_SRC_LINE_IN,
MUSIC_SRC_RADIO,
@ -23,7 +21,7 @@ from pysonos.core import (
SoCo,
)
from pysonos.data_structures import DidlFavorite
from pysonos.events_base import Event, SubscriptionBase
from pysonos.events_base import Event as SonosEvent
from pysonos.exceptions import SoCoException, SoCoUPnPException
import pysonos.music_library
import pysonos.snapshot
@ -32,6 +30,7 @@ import voluptuous as vol
from homeassistant.components.media_player import MediaPlayerEntity
from homeassistant.components.media_player.const import (
ATTR_MEDIA_ENQUEUE,
DOMAIN as MP_DOMAIN,
MEDIA_TYPE_ALBUM,
MEDIA_TYPE_ARTIST,
MEDIA_TYPE_MUSIC,
@ -59,35 +58,36 @@ from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.plex.const import PLEX_URI_SCHEME
from homeassistant.components.plex.services import play_on_sonos
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
ATTR_TIME,
CONF_HOSTS,
EVENT_HOMEASSISTANT_STOP,
STATE_IDLE,
STATE_PAUSED,
STATE_PLAYING,
)
from homeassistant.const import ATTR_TIME, STATE_IDLE, STATE_PAUSED, STATE_PLAYING
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.helpers import config_validation as cv, entity_platform, service
import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.network import is_internal_request
from homeassistant.util.dt import utcnow
from . import CONF_ADVERTISE_ADDR, CONF_INTERFACE_ADDR
from . import SonosData
from .const import (
DATA_SONOS,
DOMAIN as SONOS_DOMAIN,
MEDIA_TYPES_TO_SONOS,
PLAYABLE_MEDIA_TYPES,
SONOS_CONTENT_UPDATE,
SONOS_DISCOVERY_UPDATE,
SONOS_ENTITY_CREATED,
SONOS_GROUP_UPDATE,
SONOS_MEDIA_UPDATE,
SONOS_PLAYER_RECONNECTED,
SONOS_VOLUME_UPDATE,
)
from .entity import SonosEntity
from .media_browser import build_item_response, get_media, library_payload
from .speaker import SonosSpeaker
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = 10
DISCOVERY_INTERVAL = 60
SEEN_EXPIRE_TIME = 3.5 * DISCOVERY_INTERVAL
SUPPORT_SONOS = (
SUPPORT_BROWSE_MEDIA
| SUPPORT_CLEAR_PLAYLIST
@ -146,98 +146,17 @@ ATTR_STATUS_LIGHT = "status_light"
UNAVAILABLE_VALUES = {"", "NOT_IMPLEMENTED", None}
class SonosData:
"""Storage class for platform global data."""
def __init__(self) -> None:
"""Initialize the data."""
self.entities: list[SonosEntity] = []
self.discovered: list[str] = []
self.topology_condition = asyncio.Condition()
self.discovery_thread = None
self.hosts_heartbeat = None
async def async_setup_entry( # noqa: C901
async def async_setup_entry(
hass: HomeAssistant, config_entry: ConfigEntry, async_add_entities: Callable
) -> None:
"""Set up Sonos from a config entry."""
if DATA_SONOS not in hass.data:
hass.data[DATA_SONOS] = SonosData()
config = hass.data[SONOS_DOMAIN].get("media_player", {})
_LOGGER.debug("Reached async_setup_entry, config=%s", config)
pysonos.config.EVENTS_MODULE = events_asyncio
advertise_addr = config.get(CONF_ADVERTISE_ADDR)
if advertise_addr:
pysonos.config.EVENT_ADVERTISE_IP = advertise_addr
def _stop_discovery(event: Event) -> None:
data = hass.data[DATA_SONOS]
if data.discovery_thread:
data.discovery_thread.stop()
data.discovery_thread = None
if data.hosts_heartbeat:
data.hosts_heartbeat()
data.hosts_heartbeat = None
def _discovery(now: datetime.datetime | None = None) -> None:
"""Discover players from network or configuration."""
hosts = config.get(CONF_HOSTS)
def _discovered_player(soco: SoCo) -> None:
"""Handle a (re)discovered player."""
try:
_LOGGER.debug("Reached _discovered_player, soco=%s", soco)
if soco.uid not in hass.data[DATA_SONOS].discovered:
_LOGGER.debug("Adding new entity")
hass.data[DATA_SONOS].discovered.append(soco.uid)
hass.add_job(async_add_entities, [SonosEntity(soco)])
else:
entity = _get_entity_from_soco_uid(hass, soco.uid)
if entity and (entity.soco == soco or not entity.available):
_LOGGER.debug("Seen %s", entity)
hass.add_job(entity.async_seen(soco)) # type: ignore
except SoCoException as ex:
_LOGGER.debug("SoCoException, ex=%s", ex)
if hosts:
for host in hosts:
try:
_LOGGER.debug("Testing %s", host)
player = pysonos.SoCo(socket.gethostbyname(host))
if player.is_visible:
# Make sure that the player is available
_ = player.volume
_discovered_player(player)
except (OSError, SoCoException) as ex:
_LOGGER.debug("Exception %s", ex)
if now is None:
_LOGGER.warning("Failed to initialize '%s'", host)
_LOGGER.debug("Tested all hosts")
hass.data[DATA_SONOS].hosts_heartbeat = hass.helpers.event.call_later(
DISCOVERY_INTERVAL, _discovery
)
else:
_LOGGER.debug("Starting discovery thread")
hass.data[DATA_SONOS].discovery_thread = pysonos.discover_thread(
_discovered_player,
interval=DISCOVERY_INTERVAL,
interface_addr=config.get(CONF_INTERFACE_ADDR),
)
hass.data[DATA_SONOS].discovery_thread.name = "Sonos-Discovery"
_LOGGER.debug("Adding discovery job")
hass.async_add_executor_job(_discovery)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _stop_discovery)
platform = entity_platform.current_platform.get()
@callback
def async_create_entities(speaker: SonosSpeaker) -> None:
"""Handle device discovery and create entities."""
async_add_entities([SonosMediaPlayerEntity(speaker, hass.data[DATA_SONOS])])
@service.verify_domain_control(hass, SONOS_DOMAIN)
async def async_service_handle(service_call: ServiceCall) -> None:
"""Handle dispatched services."""
@ -248,28 +167,30 @@ async def async_setup_entry( # noqa: C901
return
for entity in entities:
assert isinstance(entity, SonosEntity)
assert isinstance(entity, SonosMediaPlayerEntity)
if service_call.service == SERVICE_JOIN:
master = platform.entities.get(service_call.data[ATTR_MASTER])
if master:
await SonosEntity.join_multi(hass, master, entities) # type: ignore[arg-type]
await SonosMediaPlayerEntity.join_multi(hass, master, entities) # type: ignore[arg-type]
else:
_LOGGER.error(
"Invalid master specified for join service: %s",
service_call.data[ATTR_MASTER],
)
elif service_call.service == SERVICE_UNJOIN:
await SonosEntity.unjoin_multi(hass, entities) # type: ignore[arg-type]
await SonosMediaPlayerEntity.unjoin_multi(hass, entities) # type: ignore[arg-type]
elif service_call.service == SERVICE_SNAPSHOT:
await SonosEntity.snapshot_multi(
await SonosMediaPlayerEntity.snapshot_multi(
hass, entities, service_call.data[ATTR_WITH_GROUP] # type: ignore[arg-type]
)
elif service_call.service == SERVICE_RESTORE:
await SonosEntity.restore_multi(
await SonosMediaPlayerEntity.restore_multi(
hass, entities, service_call.data[ATTR_WITH_GROUP] # type: ignore[arg-type]
)
async_dispatcher_connect(hass, SONOS_DISCOVERY_UPDATE, async_create_entities)
hass.services.async_register(
SONOS_DOMAIN,
SERVICE_JOIN,
@ -343,13 +264,11 @@ async def async_setup_entry( # noqa: C901
)
def _get_entity_from_soco_uid(hass: HomeAssistant, uid: str) -> SonosEntity | None:
"""Return SonosEntity from SoCo uid."""
entities: list[SonosEntity] = hass.data[DATA_SONOS].entities
for entity in entities:
if uid == entity.unique_id:
return entity
return None
def _get_entity_from_soco_uid(
hass: HomeAssistant, uid: str
) -> SonosMediaPlayerEntity | None:
"""Return SonosMediaPlayerEntity from SoCo uid."""
return hass.data[DATA_SONOS].media_player_entities.get(uid) # type: ignore[no-any-return]
def soco_error(errorcodes: list[str] | None = None) -> Callable:
@ -378,7 +297,7 @@ def soco_coordinator(funct: Callable) -> Callable:
"""Call function on coordinator."""
@ft.wraps(funct)
def wrapper(entity: SonosEntity, *args: Any, **kwargs: Any) -> Any:
def wrapper(entity: SonosMediaPlayerEntity, *args: Any, **kwargs: Any) -> Any:
"""Wrap for call to coordinator."""
if entity.is_coordinator:
return funct(entity, *args, **kwargs)
@ -396,22 +315,18 @@ def _timespan_secs(timespan: str | None) -> None | float:
return sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":"))))
class SonosEntity(MediaPlayerEntity):
class SonosMediaPlayerEntity(SonosEntity, MediaPlayerEntity):
"""Representation of a Sonos entity."""
def __init__(self, player: SoCo) -> None:
def __init__(self, speaker: SonosSpeaker, sonos_data: SonosData) -> None:
"""Initialize the Sonos entity."""
self._subscriptions: list[SubscriptionBase] = []
self._poll_timer: Callable | None = None
self._seen_timer: Callable | None = None
super().__init__(speaker, sonos_data)
self._volume_increment = 2
self._unique_id: str = player.uid
self._player: SoCo = player
self._player_volume: int | None = None
self._player_muted: bool | None = None
self._play_mode: str | None = None
self._coordinator: SonosEntity | None = None
self._sonos_group: list[SonosEntity] = [self]
self._coordinator: SonosMediaPlayerEntity | None = None
self._sonos_group: list[SonosMediaPlayerEntity] = [self]
self._status: str | None = None
self._uri: str | None = None
self._media_library = pysonos.music_library.MusicLibrary(self.soco)
@ -429,28 +344,59 @@ class SonosEntity(MediaPlayerEntity):
self._source_name: str | None = None
self._favorites: list[DidlFavorite] = []
self._soco_snapshot: pysonos.snapshot.Snapshot | None = None
self._snapshot_group: list[SonosEntity] | None = None
# Set these early since device_info() needs them
speaker_info: dict = self.soco.get_speaker_info(True)
self._name: str = speaker_info["zone_name"]
self._model: str = speaker_info["model_name"]
self._sw_version: str = speaker_info["software_version"]
self._mac_address: str = speaker_info["mac_address"]
self._snapshot_group: list[SonosMediaPlayerEntity] | None = None
async def async_added_to_hass(self) -> None:
"""Subscribe sonos events."""
await self.async_seen(self.soco)
self.data.media_player_entities[self.unique_id] = self
await self.async_reconnect_player()
await super().async_added_to_hass()
self.hass.data[DATA_SONOS].entities.append(self)
self.async_on_remove(
async_dispatcher_connect(
self.hass, SONOS_GROUP_UPDATE, self.async_update_groups
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_CONTENT_UPDATE}-{self.soco.uid}",
self.async_update_content,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_MEDIA_UPDATE}-{self.soco.uid}",
self.async_update_media,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_VOLUME_UPDATE}-{self.soco.uid}",
self.async_update_volume,
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_PLAYER_RECONNECTED}-{self.soco.uid}",
self.async_reconnect_player,
)
)
for entity in self.hass.data[DATA_SONOS].entities:
await entity.create_update_groups_coro()
if self.hass.is_running:
async_dispatcher_send(self.hass, SONOS_GROUP_UPDATE)
async_dispatcher_send(
self.hass, f"{SONOS_ENTITY_CREATED}-{self.soco.uid}", MP_DOMAIN
)
@property
def unique_id(self) -> str:
"""Return a unique ID."""
return self._unique_id
return self.soco.uid # type: ignore[no-any-return]
def __hash__(self) -> int:
"""Return a hash of self."""
@ -459,20 +405,7 @@ class SonosEntity(MediaPlayerEntity):
@property
def name(self) -> str:
"""Return the name of the entity."""
return self._name
@property
def device_info(self) -> dict:
"""Return information about the device."""
return {
"identifiers": {(SONOS_DOMAIN, self._unique_id)},
"name": self._name,
"model": self._model.replace("Sonos ", ""),
"sw_version": self._sw_version,
"connections": {(dr.CONNECTION_NETWORK_MAC, self._mac_address)},
"manufacturer": "Sonos",
"suggested_area": self._name,
}
return self.speaker.zone_name # type: ignore[no-any-return]
@property # type: ignore[misc]
@soco_coordinator
@ -496,65 +429,11 @@ class SonosEntity(MediaPlayerEntity):
"""Return true if player is a coordinator."""
return self._coordinator is None
@property
def soco(self) -> SoCo:
"""Return soco object."""
return self._player
@property
def coordinator(self) -> SoCo:
"""Return coordinator of this player."""
return self._coordinator
async def async_seen(self, player: SoCo) -> None:
"""Record that this player was seen right now."""
was_available = self.available
_LOGGER.debug("Async seen: %s, was_available: %s", player, was_available)
self._player = player
if self._seen_timer:
self._seen_timer()
self._seen_timer = self.hass.helpers.event.async_call_later(
SEEN_EXPIRE_TIME, self.async_unseen
)
if was_available:
return
self._poll_timer = self.hass.helpers.event.async_track_time_interval(
self.update, datetime.timedelta(seconds=SCAN_INTERVAL)
)
done = await self._async_attach_player()
if not done:
assert self._seen_timer is not None
self._seen_timer()
await self.async_unseen()
self.async_write_ha_state()
async def async_unseen(self, now: datetime.datetime | None = None) -> None:
"""Make this player unavailable when it was not seen recently."""
self._seen_timer = None
if self._poll_timer:
self._poll_timer()
self._poll_timer = None
for subscription in self._subscriptions:
await subscription.unsubscribe()
self._subscriptions = []
self.async_write_ha_state()
@property
def available(self) -> bool:
"""Return True if entity is available."""
return self._seen_timer is not None
def _clear_media_position(self) -> None:
"""Clear the media_position."""
self._media_position = None
@ -572,49 +451,23 @@ class SonosEntity(MediaPlayerEntity):
# Skip unknown types
_LOGGER.error("Unhandled favorite '%s': %s", fav.title, ex)
def _attach_player(self) -> None:
"""Get basic information and add event subscriptions."""
async def async_reconnect_player(self) -> None:
"""Set basic information when player is reconnected."""
await self.hass.async_add_executor_job(self._reconnect_player)
def _reconnect_player(self) -> None:
"""Set basic information when player is reconnected."""
self._play_mode = self.soco.play_mode
self.update_volume()
self._set_favorites()
async def _async_attach_player(self) -> bool:
"""Get basic information and add event subscriptions."""
try:
await self.hass.async_add_executor_job(self._attach_player)
player = self.soco
if self._subscriptions:
raise RuntimeError(
f"Attempted to attach subscriptions to player: {player} "
f"when existing subscriptions exist: {self._subscriptions}"
)
await self._subscribe(player.avTransport, self.async_update_media)
await self._subscribe(player.renderingControl, self.async_update_volume)
await self._subscribe(player.zoneGroupTopology, self.async_update_groups)
await self._subscribe(player.contentDirectory, self.async_update_content)
return True
except SoCoException as ex:
_LOGGER.warning("Could not connect %s: %s", self.entity_id, ex)
return False
async def _subscribe(
self, target: SubscriptionBase, sub_callback: Callable
) -> None:
"""Create a sonos subscription."""
subscription = await target.subscribe(auto_renew=True)
subscription.callback = sub_callback
self._subscriptions.append(subscription)
@property
def should_poll(self) -> bool:
"""Return that we should not be polled (we handle that internally)."""
return False
def update(self, now: datetime.datetime | None = None) -> None:
async def async_update(self, now: datetime.datetime | None = None) -> None:
"""Retrieve latest state."""
await self.hass.async_add_executor_job(self._update, now)
def _update(self, now: datetime.datetime | None = None) -> None:
"""Retrieve latest state."""
_LOGGER.debug("Polling speaker %s", self.speaker.zone_name)
try:
self.update_groups()
self.update_volume()
@ -624,11 +477,11 @@ class SonosEntity(MediaPlayerEntity):
pass
@callback
def async_update_media(self, event: Event | None = None) -> None:
def async_update_media(self, event: SonosEvent | None = None) -> None:
"""Update information about currently playing media."""
self.hass.async_add_executor_job(self.update_media, event)
def update_media(self, event: Event | None = None) -> None:
def update_media(self, event: SonosEvent | None = None) -> None:
"""Update information about currently playing media."""
variables = event and event.variables
@ -685,7 +538,8 @@ class SonosEntity(MediaPlayerEntity):
self.schedule_update_ha_state()
# Also update slaves
for entity in self.hass.data[DATA_SONOS].entities:
entities = self.data.media_player_entities.values()
for entity in entities:
coordinator = entity.coordinator
if coordinator and coordinator.unique_id == self.unique_id:
entity.schedule_update_ha_state()
@ -774,7 +628,7 @@ class SonosEntity(MediaPlayerEntity):
self._queue_position = playlist_position - 1
@callback
def async_update_volume(self, event: Event) -> None:
def async_update_volume(self, event: SonosEvent) -> None:
"""Update information about currently volume settings."""
variables = event.variables
@ -799,20 +653,22 @@ class SonosEntity(MediaPlayerEntity):
self._night_sound = self.soco.night_mode
self._speech_enhance = self.soco.dialog_mode
def update_groups(self, event: Event | None = None) -> None:
def update_groups(self, event: SonosEvent | None = None) -> None:
"""Handle callback for topology change event."""
coro = self.create_update_groups_coro(event)
if coro:
self.hass.add_job(coro) # type: ignore
@callback
def async_update_groups(self, event: Event | None = None) -> None:
def async_update_groups(self, event: SonosEvent | None = None) -> None:
"""Handle callback for topology change event."""
coro = self.create_update_groups_coro(event)
if coro:
self.hass.async_add_job(coro) # type: ignore
def create_update_groups_coro(self, event: Event | None = None) -> Coroutine | None:
def create_update_groups_coro(
self, event: SonosEvent | None = None
) -> Coroutine | None:
"""Handle callback for topology change event."""
def _get_soco_group() -> list[str]:
@ -831,7 +687,7 @@ class SonosEntity(MediaPlayerEntity):
return [coordinator_uid] + slave_uids
async def _async_extract_group(event: Event) -> list[str]:
async def _async_extract_group(event: SonosEvent) -> list[str]:
"""Extract group layout from a topology event."""
group = event and event.zone_player_uui_ds_in_group
if group:
@ -859,22 +715,18 @@ class SonosEntity(MediaPlayerEntity):
# pylint: disable=protected-access
slave._coordinator = self
slave._sonos_group = sonos_group
slave.async_schedule_update_ha_state()
slave.async_write_ha_state()
async def _async_handle_group_event(event: Event) -> None:
async def _async_handle_group_event(event: SonosEvent) -> None:
"""Get async lock and handle event."""
if event and self._poll_timer:
# Cancel poll timer since we do receive events
self._poll_timer()
self._poll_timer = None
async with self.hass.data[DATA_SONOS].topology_condition:
async with self.data.topology_condition:
group = await _async_extract_group(event)
if self.unique_id == group[0]:
_async_regroup(group)
self.hass.data[DATA_SONOS].topology_condition.notify_all()
self.data.topology_condition.notify_all()
if event and not hasattr(event, "zone_player_uui_ds_in_group"):
return None
@ -882,7 +734,7 @@ class SonosEntity(MediaPlayerEntity):
return _async_handle_group_event(event)
@callback
def async_update_content(self, event: Event | None = None) -> None:
def async_update_content(self, event: SonosEvent | None = None) -> None:
"""Update information about available content."""
if event and "favorites_update_id" in event.variables:
self.hass.async_add_job(self._set_favorites)
@ -992,12 +844,12 @@ class SonosEntity(MediaPlayerEntity):
@soco_error()
def volume_up(self) -> None:
"""Volume up media player."""
self._player.volume += self._volume_increment
self.soco.volume += self._volume_increment
@soco_error()
def volume_down(self) -> None:
"""Volume down media player."""
self._player.volume -= self._volume_increment
self.soco.volume -= self._volume_increment
@soco_error()
def set_volume_level(self, volume: str) -> None:
@ -1054,7 +906,7 @@ class SonosEntity(MediaPlayerEntity):
"""List of available input sources."""
sources = [fav.title for fav in self._favorites]
model = self._model.upper()
model = self.speaker.model_name.upper()
if "PLAY:5" in model or "CONNECT" in model:
sources += [SOURCE_LINEIN]
elif "PLAYBAR" in model:
@ -1168,7 +1020,9 @@ class SonosEntity(MediaPlayerEntity):
_LOGGER.error('Sonos does not support a media type of "%s"', media_type)
@soco_error()
def join(self, slaves: list[SonosEntity]) -> list[SonosEntity]:
def join(
self, slaves: list[SonosMediaPlayerEntity]
) -> list[SonosMediaPlayerEntity]:
"""Form a group with other players."""
if self._coordinator:
self.unjoin()
@ -1188,14 +1042,16 @@ class SonosEntity(MediaPlayerEntity):
@staticmethod
async def join_multi(
hass: HomeAssistant, master: SonosEntity, entities: list[SonosEntity]
hass: HomeAssistant,
master: SonosMediaPlayerEntity,
entities: list[SonosMediaPlayerEntity],
) -> None:
"""Form a group with other players."""
async with hass.data[DATA_SONOS].topology_condition:
group: list[SonosEntity] = await hass.async_add_executor_job(
group: list[SonosMediaPlayerEntity] = await hass.async_add_executor_job(
master.join, entities
)
await SonosEntity.wait_for_groups(hass, [group])
await SonosMediaPlayerEntity.wait_for_groups(hass, [group])
@soco_error()
def unjoin(self) -> None:
@ -1204,10 +1060,12 @@ class SonosEntity(MediaPlayerEntity):
self._coordinator = None
@staticmethod
async def unjoin_multi(hass: HomeAssistant, entities: list[SonosEntity]) -> None:
async def unjoin_multi(
hass: HomeAssistant, entities: list[SonosMediaPlayerEntity]
) -> None:
"""Unjoin several players from their group."""
def _unjoin_all(entities: list[SonosEntity]) -> None:
def _unjoin_all(entities: list[SonosMediaPlayerEntity]) -> None:
"""Sync helper."""
# Unjoin slaves first to prevent inheritance of queues
coordinators = [e for e in entities if e.is_coordinator]
@ -1218,7 +1076,7 @@ class SonosEntity(MediaPlayerEntity):
async with hass.data[DATA_SONOS].topology_condition:
await hass.async_add_executor_job(_unjoin_all, entities)
await SonosEntity.wait_for_groups(hass, [[e] for e in entities])
await SonosMediaPlayerEntity.wait_for_groups(hass, [[e] for e in entities])
@soco_error()
def snapshot(self, with_group: bool) -> None:
@ -1232,12 +1090,12 @@ class SonosEntity(MediaPlayerEntity):
@staticmethod
async def snapshot_multi(
hass: HomeAssistant, entities: list[SonosEntity], with_group: bool
hass: HomeAssistant, entities: list[SonosMediaPlayerEntity], with_group: bool
) -> None:
"""Snapshot all the entities and optionally their groups."""
# pylint: disable=protected-access
def _snapshot_all(entities: list[SonosEntity]) -> None:
def _snapshot_all(entities: list[SonosMediaPlayerEntity]) -> None:
"""Sync helper."""
for entity in entities:
entity.snapshot(with_group)
@ -1266,14 +1124,14 @@ class SonosEntity(MediaPlayerEntity):
@staticmethod
async def restore_multi(
hass: HomeAssistant, entities: list[SonosEntity], with_group: bool
hass: HomeAssistant, entities: list[SonosMediaPlayerEntity], with_group: bool
) -> None:
"""Restore snapshots for all the entities."""
# pylint: disable=protected-access
def _restore_groups(
entities: list[SonosEntity], with_group: bool
) -> list[list[SonosEntity]]:
entities: list[SonosMediaPlayerEntity], with_group: bool
) -> list[list[SonosMediaPlayerEntity]]:
"""Pause all current coordinators and restore groups."""
for entity in (e for e in entities if e.is_coordinator):
if entity.state == STATE_PLAYING:
@ -1296,7 +1154,7 @@ class SonosEntity(MediaPlayerEntity):
return groups
def _restore_players(entities: list[SonosEntity]) -> None:
def _restore_players(entities: list[SonosMediaPlayerEntity]) -> None:
"""Restore state of all players."""
for entity in (e for e in entities if not e.is_coordinator):
entity.restore()
@ -1316,18 +1174,18 @@ class SonosEntity(MediaPlayerEntity):
_restore_groups, entities_set, with_group
)
await SonosEntity.wait_for_groups(hass, groups)
await SonosMediaPlayerEntity.wait_for_groups(hass, groups)
await hass.async_add_executor_job(_restore_players, entities_set)
@staticmethod
async def wait_for_groups(
hass: HomeAssistant, groups: list[list[SonosEntity]]
hass: HomeAssistant, groups: list[list[SonosMediaPlayerEntity]]
) -> None:
"""Wait until all groups are present, or timeout."""
# pylint: disable=protected-access
def _test_groups(groups: list[list[SonosEntity]]) -> bool:
def _test_groups(groups: list[list[SonosMediaPlayerEntity]]) -> bool:
"""Return whether all groups exist now."""
for group in groups:
coordinator = group[0]
@ -1350,7 +1208,7 @@ class SonosEntity(MediaPlayerEntity):
except asyncio.TimeoutError:
_LOGGER.warning("Timeout waiting for target groups %s", groups)
for entity in hass.data[DATA_SONOS].entities:
for entity in hass.data[DATA_SONOS].media_player_entities.values():
entity.soco._zgs_cache.clear()
@soco_error()

View File

@ -0,0 +1,204 @@
"""Entity representing a Sonos battery level."""
from __future__ import annotations
import contextlib
import datetime
import logging
from typing import Any
from pysonos.core import SoCo
from pysonos.events_base import Event as SonosEvent
from pysonos.exceptions import SoCoException
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.const import DEVICE_CLASS_BATTERY, PERCENTAGE, STATE_UNKNOWN
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.icon import icon_for_battery_level
from homeassistant.util import dt as dt_util
from . import SonosData
from .const import (
BATTERY_SCAN_INTERVAL,
DATA_SONOS,
SONOS_DISCOVERY_UPDATE,
SONOS_ENTITY_CREATED,
SONOS_PROPERTIES_UPDATE,
)
from .entity import SonosEntity
from .speaker import SonosSpeaker
_LOGGER = logging.getLogger(__name__)
ATTR_BATTERY_LEVEL = "battery_level"
ATTR_BATTERY_CHARGING = "charging"
ATTR_BATTERY_POWERSOURCE = "power_source"
EVENT_CHARGING = {
"CHARGING": True,
"NOT_CHARGING": False,
}
def fetch_battery_info_or_none(soco: SoCo) -> dict[str, Any] | None:
"""Fetch battery_info from the given SoCo object.
Returns None if the device doesn't support battery info
or if the device is offline.
"""
with contextlib.suppress(ConnectionError, TimeoutError, SoCoException):
return soco.get_battery_info()
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up Sonos from a config entry."""
sonos_data = hass.data[DATA_SONOS]
async def _async_create_entity(speaker: SonosSpeaker) -> SonosBatteryEntity | None:
if battery_info := await hass.async_add_executor_job(
fetch_battery_info_or_none, speaker.soco
):
return SonosBatteryEntity(speaker, sonos_data, battery_info)
return None
async def _async_create_entities(speaker: SonosSpeaker):
if entity := await _async_create_entity(speaker):
async_add_entities([entity])
else:
async_dispatcher_send(
hass, f"{SONOS_ENTITY_CREATED}-{speaker.soco.uid}", SENSOR_DOMAIN
)
async_dispatcher_connect(hass, SONOS_DISCOVERY_UPDATE, _async_create_entities)
class SonosBatteryEntity(SonosEntity, Entity):
"""Representation of a Sonos Battery entity."""
def __init__(
self, speaker: SonosSpeaker, sonos_data: SonosData, battery_info: dict[str, Any]
):
"""Initialize a SonosBatteryEntity."""
super().__init__(speaker, sonos_data)
self._battery_info: dict[str, Any] = battery_info
self._last_event: datetime.datetime = None
async def async_added_to_hass(self) -> None:
"""Register polling callback when added to hass."""
await super().async_added_to_hass()
self.async_on_remove(
self.hass.helpers.event.async_track_time_interval(
self.async_update, BATTERY_SCAN_INTERVAL
)
)
self.async_on_remove(
async_dispatcher_connect(
self.hass,
f"{SONOS_PROPERTIES_UPDATE}-{self.soco.uid}",
self.async_update_battery_info,
)
)
async_dispatcher_send(
self.hass, f"{SONOS_ENTITY_CREATED}-{self.soco.uid}", SENSOR_DOMAIN
)
async def async_update_battery_info(self, event: SonosEvent = None) -> None:
"""Update battery info using the provided SonosEvent."""
if event is None:
return
if (more_info := event.variables.get("more_info")) is None:
return
more_info_dict = dict(x.split(":") for x in more_info.split(","))
self._last_event = dt_util.utcnow()
is_charging = EVENT_CHARGING[more_info_dict["BattChg"]]
if is_charging == self.charging:
self._battery_info.update({"Level": int(more_info_dict["BattPct"])})
else:
if battery_info := await self.hass.async_add_executor_job(
fetch_battery_info_or_none, self.soco
):
self._battery_info = battery_info
self.async_write_ha_state()
@property
def unique_id(self) -> str:
"""Return the unique ID of the sensor."""
return f"{self.soco.uid}-battery"
@property
def name(self) -> str:
"""Return the name of the sensor."""
return f"{self.speaker.zone_name} Battery"
@property
def device_class(self) -> str:
"""Return the entity's device class."""
return DEVICE_CLASS_BATTERY
@property
def unit_of_measurement(self) -> str:
"""Get the unit of measurement."""
return PERCENTAGE
async def async_update(self, event=None) -> None:
"""Poll the device for the current state."""
if not self.available:
# wait for the Sonos device to come back online
return
if (
self._last_event
and dt_util.utcnow() - self._last_event < BATTERY_SCAN_INTERVAL
):
return
if battery_info := await self.hass.async_add_executor_job(
fetch_battery_info_or_none, self.soco
):
self._battery_info = battery_info
self.async_write_ha_state()
@property
def battery_level(self) -> int:
"""Return the battery level."""
return self._battery_info.get("Level", 0)
@property
def power_source(self) -> str:
"""Return the name of the power source.
Observed to be either BATTERY or SONOS_CHARGING_RING or USB_POWER.
"""
return self._battery_info.get("PowerSource", STATE_UNKNOWN)
@property
def charging(self) -> bool:
"""Return the charging status of this battery."""
return self.power_source not in ("BATTERY", STATE_UNKNOWN)
@property
def icon(self) -> str:
"""Return the icon of the sensor."""
return icon_for_battery_level(self.battery_level, self.charging)
@property
def state(self) -> int | None:
"""Return the state of the sensor."""
return self._battery_info.get("Level")
@property
def device_state_attributes(self) -> dict[str, Any]:
"""Return entity specific state attributes."""
return {
ATTR_BATTERY_CHARGING: self.charging,
ATTR_BATTERY_POWERSOURCE: self.power_source,
}

View File

@ -0,0 +1,217 @@
"""Base class for common speaker tasks."""
from __future__ import annotations
from asyncio import gather
import datetime
import logging
from typing import Any, Callable
from pysonos.core import SoCo
from pysonos.events_base import Event as SonosEvent, SubscriptionBase
from pysonos.exceptions import SoCoException
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import (
async_dispatcher_send,
dispatcher_connect,
dispatcher_send,
)
from .const import (
PLATFORMS,
SCAN_INTERVAL,
SEEN_EXPIRE_TIME,
SONOS_CONTENT_UPDATE,
SONOS_DISCOVERY_UPDATE,
SONOS_ENTITY_CREATED,
SONOS_ENTITY_UPDATE,
SONOS_GROUP_UPDATE,
SONOS_MEDIA_UPDATE,
SONOS_PLAYER_RECONNECTED,
SONOS_PROPERTIES_UPDATE,
SONOS_SEEN,
SONOS_STATE_UPDATED,
SONOS_VOLUME_UPDATE,
)
_LOGGER = logging.getLogger(__name__)
class SonosSpeaker:
"""Representation of a Sonos speaker."""
def __init__(self, hass: HomeAssistant, soco: SoCo, speaker_info: dict[str, Any]):
"""Initialize a SonosSpeaker."""
self._is_ready: bool = False
self._subscriptions: list[SubscriptionBase] = []
self._poll_timer: Callable | None = None
self._seen_timer: Callable | None = None
self._seen_dispatcher: Callable | None = None
self._entity_creation_dispatcher: Callable | None = None
self._platforms_ready: set[str] = set()
self.hass: HomeAssistant = hass
self.soco: SoCo = soco
self.mac_address = speaker_info["mac_address"]
self.model_name = speaker_info["model_name"]
self.version = speaker_info["software_version"]
self.zone_name = speaker_info["zone_name"]
def setup(self) -> None:
"""Run initial setup of the speaker."""
self._entity_creation_dispatcher = dispatcher_connect(
self.hass,
f"{SONOS_ENTITY_CREATED}-{self.soco.uid}",
self.async_handle_new_entity,
)
self._seen_dispatcher = dispatcher_connect(
self.hass, f"{SONOS_SEEN}-{self.soco.uid}", self.async_seen
)
dispatcher_send(self.hass, SONOS_DISCOVERY_UPDATE, self)
async def async_handle_new_entity(self, entity_type: str) -> None:
"""Listen to new entities to trigger first subscription."""
self._platforms_ready.add(entity_type)
if self._platforms_ready == PLATFORMS:
await self.async_subscribe()
self._is_ready = True
@callback
def async_write_entity_states(self) -> bool:
"""Write states for associated SonosEntity instances."""
async_dispatcher_send(self.hass, f"{SONOS_STATE_UPDATED}-{self.soco.uid}")
@property
def available(self) -> bool:
"""Return whether this speaker is available."""
return self._seen_timer is not None
async def async_subscribe(self) -> bool:
"""Initiate event subscriptions."""
_LOGGER.debug("Creating subscriptions for %s", self.zone_name)
try:
self.async_dispatch_player_reconnected()
if self._subscriptions:
raise RuntimeError(
f"Attempted to attach subscriptions to player: {self.soco} "
f"when existing subscriptions exist: {self._subscriptions}"
)
await gather(
self._subscribe(self.soco.avTransport, self.async_dispatch_media),
self._subscribe(self.soco.renderingControl, self.async_dispatch_volume),
self._subscribe(
self.soco.contentDirectory, self.async_dispatch_content
),
self._subscribe(
self.soco.zoneGroupTopology, self.async_dispatch_groups
),
self._subscribe(
self.soco.deviceProperties, self.async_dispatch_properties
),
)
return True
except SoCoException as ex:
_LOGGER.warning("Could not connect %s: %s", self.zone_name, ex)
return False
async def _subscribe(
self, target: SubscriptionBase, sub_callback: Callable
) -> None:
"""Create a Sonos subscription."""
subscription = await target.subscribe(auto_renew=True)
subscription.callback = sub_callback
self._subscriptions.append(subscription)
@callback
def async_dispatch_media(self, event: SonosEvent | None = None) -> None:
"""Update currently playing media from event."""
async_dispatcher_send(self.hass, f"{SONOS_MEDIA_UPDATE}-{self.soco.uid}", event)
@callback
def async_dispatch_content(self, event: SonosEvent | None = None) -> None:
"""Update available content from event."""
async_dispatcher_send(
self.hass, f"{SONOS_CONTENT_UPDATE}-{self.soco.uid}", event
)
@callback
def async_dispatch_volume(self, event: SonosEvent | None = None) -> None:
"""Update volume from event."""
async_dispatcher_send(
self.hass, f"{SONOS_VOLUME_UPDATE}-{self.soco.uid}", event
)
@callback
def async_dispatch_properties(self, event: SonosEvent | None = None) -> None:
"""Update properties from event."""
async_dispatcher_send(
self.hass, f"{SONOS_PROPERTIES_UPDATE}-{self.soco.uid}", event
)
@callback
def async_dispatch_groups(self, event: SonosEvent | None = None) -> None:
"""Update groups from event."""
if event and self._poll_timer:
_LOGGER.debug(
"Received event, cancelling poll timer for %s", self.zone_name
)
self._poll_timer()
self._poll_timer = None
async_dispatcher_send(self.hass, SONOS_GROUP_UPDATE, event)
@callback
def async_dispatch_player_reconnected(self) -> None:
"""Signal that player has been reconnected."""
async_dispatcher_send(self.hass, f"{SONOS_PLAYER_RECONNECTED}-{self.soco.uid}")
async def async_seen(self, soco: SoCo | None = None) -> None:
"""Record that this speaker was seen right now."""
if soco is not None:
self.soco = soco
was_available = self.available
_LOGGER.debug("Async seen: %s, was_available: %s", self.soco, was_available)
if self._seen_timer:
self._seen_timer()
self._seen_timer = self.hass.helpers.event.async_call_later(
SEEN_EXPIRE_TIME.total_seconds(), self.async_unseen
)
if was_available:
self.async_write_entity_states()
return
self._poll_timer = self.hass.helpers.event.async_track_time_interval(
async_dispatcher_send(self.hass, f"{SONOS_ENTITY_UPDATE}-{self.soco.uid}"),
SCAN_INTERVAL,
)
if self._is_ready:
done = await self.async_subscribe()
if not done:
assert self._seen_timer is not None
self._seen_timer()
await self.async_unseen()
self.async_write_entity_states()
async def async_unseen(self, now: datetime.datetime | None = None) -> None:
"""Make this player unavailable when it was not seen recently."""
self.async_write_entity_states()
self._seen_timer = None
if self._poll_timer:
self._poll_timer()
self._poll_timer = None
for subscription in self._subscriptions:
await subscription.unsubscribe()
self._subscriptions = []

View File

@ -17,7 +17,7 @@ def config_entry_fixture():
@pytest.fixture(name="soco")
def soco_fixture(music_library, speaker_info, dummy_soco_service):
def soco_fixture(music_library, speaker_info, battery_info, dummy_soco_service):
"""Create a mock pysonos SoCo fixture."""
with patch("pysonos.SoCo", autospec=True) as mock, patch(
"socket.gethostbyname", return_value="192.168.42.2"
@ -31,10 +31,12 @@ def soco_fixture(music_library, speaker_info, dummy_soco_service):
mock_soco.renderingControl = dummy_soco_service
mock_soco.zoneGroupTopology = dummy_soco_service
mock_soco.contentDirectory = dummy_soco_service
mock_soco.deviceProperties = dummy_soco_service
mock_soco.mute = False
mock_soco.night_mode = True
mock_soco.dialog_mode = True
mock_soco.volume = 19
mock_soco.get_battery_info.return_value = battery_info
yield mock_soco
@ -82,3 +84,14 @@ def speaker_info_fixture():
"software_version": "49.2-64250",
"mac_address": "00-11-22-33-44-55",
}
@pytest.fixture(name="battery_info")
def battery_info_fixture():
"""Create battery_info fixture."""
return {
"Health": "GREEN",
"Level": 100,
"Temperature": "NORMAL",
"PowerSource": "SONOS_CHARGING_RING",
}

View File

@ -20,7 +20,8 @@ async def test_async_setup_entry_hosts(hass, config_entry, config, soco):
"""Test static setup."""
await setup_platform(hass, config_entry, config)
entity = hass.data[media_player.DATA_SONOS].entities[0]
entities = list(hass.data[media_player.DATA_SONOS].media_player_entities.values())
entity = entities[0]
assert entity.soco == soco
@ -28,7 +29,8 @@ async def test_async_setup_entry_discover(hass, config_entry, discover):
"""Test discovery setup."""
await setup_platform(hass, config_entry, {})
entity = hass.data[media_player.DATA_SONOS].entities[0]
entities = list(hass.data[media_player.DATA_SONOS].media_player_entities.values())
entity = entities[0]
assert entity.unique_id == "RINCON_test"

View File

@ -0,0 +1,62 @@
"""Tests for the Sonos battery sensor platform."""
from pysonos.exceptions import NotSupportedException
from homeassistant.components.sonos import DOMAIN
from homeassistant.setup import async_setup_component
async def setup_platform(hass, config_entry, config):
"""Set up the media player platform for testing."""
config_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, config)
await hass.async_block_till_done()
async def test_entity_registry_unsupported(hass, config_entry, config, soco):
"""Test sonos device without battery registered in the device registry."""
soco.get_battery_info.side_effect = NotSupportedException
await setup_platform(hass, config_entry, config)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
assert "media_player.zone_a" in entity_registry.entities
assert "sensor.zone_a_battery" not in entity_registry.entities
async def test_entity_registry_supported(hass, config_entry, config, soco):
"""Test sonos device with battery registered in the device registry."""
await setup_platform(hass, config_entry, config)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
assert "media_player.zone_a" in entity_registry.entities
assert "sensor.zone_a_battery" in entity_registry.entities
async def test_battery_missing_attributes(hass, config_entry, config, soco):
"""Test sonos device with unknown battery state."""
soco.get_battery_info.return_value = {}
await setup_platform(hass, config_entry, config)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
assert entity_registry.entities.get("sensor.zone_a_battery") is None
async def test_battery_attributes(hass, config_entry, config, soco):
"""Test sonos device with battery state."""
await setup_platform(hass, config_entry, config)
entity_registry = await hass.helpers.entity_registry.async_get_registry()
battery = entity_registry.entities["sensor.zone_a_battery"]
battery_state = hass.states.get(battery.entity_id)
# confirm initial state from conftest
assert battery_state.state == "100"
assert battery_state.attributes.get("unit_of_measurement") == "%"
assert battery_state.attributes.get("icon") == "mdi:battery-charging-100"
assert battery_state.attributes.get("charging")
assert battery_state.attributes.get("power_source") == "SONOS_CHARGING_RING"