diff --git a/homeassistant/components/forked_daapd/coordinator.py b/homeassistant/components/forked_daapd/coordinator.py new file mode 100644 index 00000000000..7a03a9075ed --- /dev/null +++ b/homeassistant/components/forked_daapd/coordinator.py @@ -0,0 +1,142 @@ +"""Support forked_daapd media player.""" + +from __future__ import annotations + +import asyncio +import logging + +from homeassistant.exceptions import PlatformNotReady +from homeassistant.helpers.dispatcher import async_dispatcher_send + +from .const import ( + SIGNAL_ADD_ZONES, + SIGNAL_UPDATE_DATABASE, + SIGNAL_UPDATE_MASTER, + SIGNAL_UPDATE_OUTPUTS, + SIGNAL_UPDATE_PLAYER, + SIGNAL_UPDATE_QUEUE, +) + +_LOGGER = logging.getLogger(__name__) + +WS_NOTIFY_EVENT_TYPES = ["player", "outputs", "volume", "options", "queue", "database"] +WEBSOCKET_RECONNECT_TIME = 30 # seconds + + +class ForkedDaapdUpdater: + """Manage updates for the forked-daapd device.""" + + def __init__(self, hass, api, entry_id): + """Initialize.""" + self.hass = hass + self._api = api + self.websocket_handler = None + self._all_output_ids = set() + self._entry_id = entry_id + + async def async_init(self): + """Perform async portion of class initialization.""" + if not (server_config := await self._api.get_request("config")): + raise PlatformNotReady + if websocket_port := server_config.get("websocket_port"): + self.websocket_handler = asyncio.create_task( + self._api.start_websocket_handler( + websocket_port, + WS_NOTIFY_EVENT_TYPES, + self._update, + WEBSOCKET_RECONNECT_TIME, + self._disconnected_callback, + ) + ) + else: + _LOGGER.error("Invalid websocket port") + + async def _disconnected_callback(self): + """Send update signals when the websocket gets disconnected.""" + async_dispatcher_send( + self.hass, SIGNAL_UPDATE_MASTER.format(self._entry_id), False + ) + async_dispatcher_send( + self.hass, SIGNAL_UPDATE_OUTPUTS.format(self._entry_id), [] + ) + + async def _update(self, update_types): + """Private update method.""" + update_types = set(update_types) + update_events = {} + _LOGGER.debug("Updating %s", update_types) + if ( + "queue" in update_types + ): # update queue, queue before player for async_play_media + if queue := await self._api.get_request("queue"): + update_events["queue"] = asyncio.Event() + async_dispatcher_send( + self.hass, + SIGNAL_UPDATE_QUEUE.format(self._entry_id), + queue, + update_events["queue"], + ) + # order of below don't matter + if not {"outputs", "volume"}.isdisjoint(update_types): # update outputs + if outputs := await self._api.get_request("outputs"): + outputs = outputs["outputs"] + update_events["outputs"] = ( + asyncio.Event() + ) # only for master, zones should ignore + async_dispatcher_send( + self.hass, + SIGNAL_UPDATE_OUTPUTS.format(self._entry_id), + outputs, + update_events["outputs"], + ) + self._add_zones(outputs) + if not {"database"}.isdisjoint(update_types): + pipes, playlists = await asyncio.gather( + self._api.get_pipes(), self._api.get_playlists() + ) + update_events["database"] = asyncio.Event() + async_dispatcher_send( + self.hass, + SIGNAL_UPDATE_DATABASE.format(self._entry_id), + pipes, + playlists, + update_events["database"], + ) + if not {"update", "config"}.isdisjoint(update_types): # not supported + _LOGGER.debug("update/config notifications neither requested nor supported") + if not {"player", "options", "volume"}.isdisjoint( + update_types + ): # update player + if player := await self._api.get_request("player"): + update_events["player"] = asyncio.Event() + if update_events.get("queue"): + await update_events[ + "queue" + ].wait() # make sure queue done before player for async_play_media + async_dispatcher_send( + self.hass, + SIGNAL_UPDATE_PLAYER.format(self._entry_id), + player, + update_events["player"], + ) + if update_events: + await asyncio.wait( + [asyncio.create_task(event.wait()) for event in update_events.values()] + ) # make sure callbacks done before update + async_dispatcher_send( + self.hass, SIGNAL_UPDATE_MASTER.format(self._entry_id), True + ) + + def _add_zones(self, outputs): + outputs_to_add = [] + for output in outputs: + if output["id"] not in self._all_output_ids: + self._all_output_ids.add(output["id"]) + outputs_to_add.append(output) + if outputs_to_add: + async_dispatcher_send( + self.hass, + SIGNAL_ADD_ZONES.format(self._entry_id), + self._api, + outputs_to_add, + ) diff --git a/homeassistant/components/forked_daapd/media_player.py b/homeassistant/components/forked_daapd/media_player.py index 0116cc57e7b..8e61df3de45 100644 --- a/homeassistant/components/forked_daapd/media_player.py +++ b/homeassistant/components/forked_daapd/media_player.py @@ -31,7 +31,6 @@ from homeassistant.components.spotify import ( from homeassistant.config_entries import ConfigEntry from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_PORT from homeassistant.core import HomeAssistant, callback -from homeassistant.exceptions import PlatformNotReady from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.dispatcher import ( async_dispatcher_connect, @@ -75,12 +74,10 @@ from .const import ( SUPPORTED_FEATURES_ZONE, TTS_TIMEOUT, ) +from .coordinator import ForkedDaapdUpdater _LOGGER = logging.getLogger(__name__) -WS_NOTIFY_EVENT_TYPES = ["player", "outputs", "volume", "options", "queue", "database"] -WEBSOCKET_RECONNECT_TIME = 30 # seconds - async def async_setup_entry( hass: HomeAssistant, @@ -897,122 +894,3 @@ class ForkedDaapdMaster(MediaPlayerEntity): if url := result.get("artwork_url"): return await self._async_fetch_image(self.api.full_url(url)) return None, None - - -class ForkedDaapdUpdater: - """Manage updates for the forked-daapd device.""" - - def __init__(self, hass, api, entry_id): - """Initialize.""" - self.hass = hass - self._api = api - self.websocket_handler = None - self._all_output_ids = set() - self._entry_id = entry_id - - async def async_init(self): - """Perform async portion of class initialization.""" - if not (server_config := await self._api.get_request("config")): - raise PlatformNotReady - if websocket_port := server_config.get("websocket_port"): - self.websocket_handler = asyncio.create_task( - self._api.start_websocket_handler( - websocket_port, - WS_NOTIFY_EVENT_TYPES, - self._update, - WEBSOCKET_RECONNECT_TIME, - self._disconnected_callback, - ) - ) - else: - _LOGGER.error("Invalid websocket port") - - async def _disconnected_callback(self): - """Send update signals when the websocket gets disconnected.""" - async_dispatcher_send( - self.hass, SIGNAL_UPDATE_MASTER.format(self._entry_id), False - ) - async_dispatcher_send( - self.hass, SIGNAL_UPDATE_OUTPUTS.format(self._entry_id), [] - ) - - async def _update(self, update_types): - """Private update method.""" - update_types = set(update_types) - update_events = {} - _LOGGER.debug("Updating %s", update_types) - if ( - "queue" in update_types - ): # update queue, queue before player for async_play_media - if queue := await self._api.get_request("queue"): - update_events["queue"] = asyncio.Event() - async_dispatcher_send( - self.hass, - SIGNAL_UPDATE_QUEUE.format(self._entry_id), - queue, - update_events["queue"], - ) - # order of below don't matter - if not {"outputs", "volume"}.isdisjoint(update_types): # update outputs - if outputs := await self._api.get_request("outputs"): - outputs = outputs["outputs"] - update_events["outputs"] = ( - asyncio.Event() - ) # only for master, zones should ignore - async_dispatcher_send( - self.hass, - SIGNAL_UPDATE_OUTPUTS.format(self._entry_id), - outputs, - update_events["outputs"], - ) - self._add_zones(outputs) - if not {"database"}.isdisjoint(update_types): - pipes, playlists = await asyncio.gather( - self._api.get_pipes(), self._api.get_playlists() - ) - update_events["database"] = asyncio.Event() - async_dispatcher_send( - self.hass, - SIGNAL_UPDATE_DATABASE.format(self._entry_id), - pipes, - playlists, - update_events["database"], - ) - if not {"update", "config"}.isdisjoint(update_types): # not supported - _LOGGER.debug("update/config notifications neither requested nor supported") - if not {"player", "options", "volume"}.isdisjoint( - update_types - ): # update player - if player := await self._api.get_request("player"): - update_events["player"] = asyncio.Event() - if update_events.get("queue"): - await update_events[ - "queue" - ].wait() # make sure queue done before player for async_play_media - async_dispatcher_send( - self.hass, - SIGNAL_UPDATE_PLAYER.format(self._entry_id), - player, - update_events["player"], - ) - if update_events: - await asyncio.wait( - [asyncio.create_task(event.wait()) for event in update_events.values()] - ) # make sure callbacks done before update - async_dispatcher_send( - self.hass, SIGNAL_UPDATE_MASTER.format(self._entry_id), True - ) - - def _add_zones(self, outputs): - outputs_to_add = [] - for output in outputs: - if output["id"] not in self._all_output_ids: - self._all_output_ids.add(output["id"]) - outputs_to_add.append(output) - if outputs_to_add: - async_dispatcher_send( - self.hass, - SIGNAL_ADD_ZONES.format(self._entry_id), - self._api, - outputs_to_add, - )