From ea70d71e8f9c52421be91723ddd94cd50a17b7c0 Mon Sep 17 00:00:00 2001 From: Markus Breitenberger Date: Fri, 1 May 2020 16:46:36 +0200 Subject: [PATCH] Use pulsectl library for PulseAudio connection (#34965) Get rid of internal library code and use pulsectl library to communicate with PulseAudio server. This is a breaking change as the library uses the much more powerful native interface instead of the CLI interface, requiring the need to change the default port. On the bright side, this also solves some issues with the existing implementation: - There was no test if the complete list of loaded modules was already received. If not all data could be read at once, the remaining modules not yet in the buffer were considered absent, resulting in unreliable behavior when a lot of modules were loaded on the server. - A switch could be turned on before the list of loaded modules was loaded, leading to a loopback module being loaded even though this module was already active (#32016). --- .../pulseaudio_loopback/manifest.json | 1 + .../components/pulseaudio_loopback/switch.py | 167 ++++++------------ requirements_all.txt | 3 + 3 files changed, 55 insertions(+), 116 deletions(-) diff --git a/homeassistant/components/pulseaudio_loopback/manifest.json b/homeassistant/components/pulseaudio_loopback/manifest.json index 8775f5f0947..bc38d8c2594 100644 --- a/homeassistant/components/pulseaudio_loopback/manifest.json +++ b/homeassistant/components/pulseaudio_loopback/manifest.json @@ -2,5 +2,6 @@ "domain": "pulseaudio_loopback", "name": "PulseAudio Loopback", "documentation": "https://www.home-assistant.io/integrations/pulseaudio_loopback", + "requirements": ["pulsectl==20.2.4"], "codeowners": [] } diff --git a/homeassistant/components/pulseaudio_loopback/switch.py b/homeassistant/components/pulseaudio_loopback/switch.py index 45577fcd674..9c27ab4e027 100644 --- a/homeassistant/components/pulseaudio_loopback/switch.py +++ b/homeassistant/components/pulseaudio_loopback/switch.py @@ -1,52 +1,32 @@ """Switch logic for loading/unloading pulseaudio loopback modules.""" -from datetime import timedelta import logging -import re -import socket +from pulsectl import Pulse, PulseError import voluptuous as vol -from homeassistant import util from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity from homeassistant.const import CONF_HOST, CONF_NAME, CONF_PORT import homeassistant.helpers.config_validation as cv -_LOGGER = logging.getLogger(__name__) -_PULSEAUDIO_SERVERS = {} +DOMAIN = "pulseaudio_loopback" + +_LOGGER = logging.getLogger(__name__) -CONF_BUFFER_SIZE = "buffer_size" CONF_SINK_NAME = "sink_name" CONF_SOURCE_NAME = "source_name" -CONF_TCP_TIMEOUT = "tcp_timeout" -DEFAULT_BUFFER_SIZE = 1024 -DEFAULT_HOST = "localhost" DEFAULT_NAME = "paloopback" -DEFAULT_PORT = 4712 -DEFAULT_TCP_TIMEOUT = 3 +DEFAULT_PORT = 4713 IGNORED_SWITCH_WARN = "Switch is already in the desired state. Ignoring." -LOAD_CMD = "load-module module-loopback sink={0} source={1}" - -MIN_TIME_BETWEEN_FORCED_SCANS = timedelta(milliseconds=100) -MIN_TIME_BETWEEN_SCANS = timedelta(seconds=10) -MOD_REGEX = ( - r"index: ([0-9]+)\s+name: " - r"\s+argument: (?=<.*sink={0}.*>)(?=<.*source={1}.*>)" -) - -UNLOAD_CMD = "unload-module {0}" - PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( { vol.Required(CONF_SINK_NAME): cv.string, vol.Required(CONF_SOURCE_NAME): cv.string, - vol.Optional(CONF_BUFFER_SIZE, default=DEFAULT_BUFFER_SIZE): cv.positive_int, - vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string, + vol.Optional(CONF_HOST): cv.string, vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port, - vol.Optional(CONF_TCP_TIMEOUT, default=DEFAULT_TCP_TIMEOUT): cv.positive_int, } ) @@ -58,97 +38,62 @@ def setup_platform(hass, config, add_entities, discovery_info=None): source_name = config.get(CONF_SOURCE_NAME) host = config.get(CONF_HOST) port = config.get(CONF_PORT) - buffer_size = config.get(CONF_BUFFER_SIZE) - tcp_timeout = config.get(CONF_TCP_TIMEOUT) + + hass.data.setdefault(DOMAIN, {}) server_id = str.format("{0}:{1}", host, port) - if server_id in _PULSEAUDIO_SERVERS: - server = _PULSEAUDIO_SERVERS[server_id] + if host: + connect_to_server = server_id else: - server = PAServer(host, port, buffer_size, tcp_timeout) - _PULSEAUDIO_SERVERS[server_id] = server + connect_to_server = None - add_entities([PALoopbackSwitch(hass, name, server, sink_name, source_name)]) + if server_id in hass.data[DOMAIN]: + server = hass.data[DOMAIN][server_id] + else: + server = Pulse(server=connect_to_server, connect=False, threading_lock=True) + hass.data[DOMAIN][server_id] = server - -class PAServer: - """Representation of a Pulseaudio server.""" - - _current_module_state = "" - - def __init__(self, host, port, buff_sz, tcp_timeout): - """Initialize PulseAudio server.""" - self._pa_host = host - self._pa_port = int(port) - self._buffer_size = int(buff_sz) - self._tcp_timeout = int(tcp_timeout) - - def _send_command(self, cmd, response_expected): - """Send a command to the pa server using a socket.""" - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self._tcp_timeout) - try: - sock.connect((self._pa_host, self._pa_port)) - _LOGGER.info("Calling pulseaudio: %s", cmd) - sock.send((cmd + "\n").encode("utf-8")) - if response_expected: - return_data = self._get_full_response(sock) - _LOGGER.debug("Data received from pulseaudio: %s", return_data) - else: - return_data = "" - finally: - sock.close() - return return_data - - def _get_full_response(self, sock): - """Get the full response back from pulseaudio.""" - result = "" - rcv_buffer = sock.recv(self._buffer_size) - result += rcv_buffer.decode("utf-8") - - while len(rcv_buffer) == self._buffer_size: - rcv_buffer = sock.recv(self._buffer_size) - result += rcv_buffer.decode("utf-8") - - return result - - @util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS) - def update_module_state(self): - """Refresh state in case an alternate process modified this data.""" - self._current_module_state = self._send_command("list-modules", True) - - def turn_on(self, sink_name, source_name): - """Send a command to pulseaudio to turn on the loopback.""" - self._send_command(str.format(LOAD_CMD, sink_name, source_name), False) - - def turn_off(self, module_idx): - """Send a command to pulseaudio to turn off the loopback.""" - self._send_command(str.format(UNLOAD_CMD, module_idx), False) - - def get_module_idx(self, sink_name, source_name): - """For a sink/source, return its module id in our cache, if found.""" - result = re.search( - str.format(MOD_REGEX, re.escape(sink_name), re.escape(source_name)), - self._current_module_state, - ) - if result and result.group(1).isdigit(): - return int(result.group(1)) - return -1 + add_entities([PALoopbackSwitch(name, server, sink_name, source_name)], True) class PALoopbackSwitch(SwitchEntity): """Representation the presence or absence of a PA loopback module.""" - def __init__(self, hass, name, pa_server, sink_name, source_name): + def __init__(self, name, pa_server, sink_name, source_name): """Initialize the Pulseaudio switch.""" - self._module_idx = -1 - self._hass = hass + self._module_idx = None self._name = name self._sink_name = sink_name self._source_name = source_name self._pa_svr = pa_server + def _get_module_idx(self): + try: + self._pa_svr.connect() + + for module in self._pa_svr.module_list(): + if not module.name == "module-loopback": + continue + + if f"sink={self._sink_name}" not in module.argument: + continue + + if f"source={self._source_name}" not in module.argument: + continue + + return module.index + + except PulseError: + return None + + return None + + @property + def available(self): + """Return true when connected to server.""" + return self._pa_svr.connected + @property def name(self): """Return the name of the switch.""" @@ -157,35 +102,25 @@ class PALoopbackSwitch(SwitchEntity): @property def is_on(self): """Return true if device is on.""" - return self._module_idx > 0 + return self._module_idx is not None def turn_on(self, **kwargs): """Turn the device on.""" if not self.is_on: - self._pa_svr.turn_on(self._sink_name, self._source_name) - self._pa_svr.update_module_state(no_throttle=True) - self._module_idx = self._pa_svr.get_module_idx( - self._sink_name, self._source_name + self._pa_svr.module_load( + "module-loopback", + args=f"sink={self._sink_name} source={self._source_name}", ) - self.schedule_update_ha_state() else: _LOGGER.warning(IGNORED_SWITCH_WARN) def turn_off(self, **kwargs): """Turn the device off.""" if self.is_on: - self._pa_svr.turn_off(self._module_idx) - self._pa_svr.update_module_state(no_throttle=True) - self._module_idx = self._pa_svr.get_module_idx( - self._sink_name, self._source_name - ) - self.schedule_update_ha_state() + self._pa_svr.module_unload(self._module_idx) else: _LOGGER.warning(IGNORED_SWITCH_WARN) def update(self): """Refresh state in case an alternate process modified this data.""" - self._pa_svr.update_module_state() - self._module_idx = self._pa_svr.get_module_idx( - self._sink_name, self._source_name - ) + self._module_idx = self._get_module_idx() diff --git a/requirements_all.txt b/requirements_all.txt index 62a27f90c38..418bf0bdb9f 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1105,6 +1105,9 @@ ptvsd==4.2.8 # homeassistant.components.wink pubnubsub-handler==1.0.8 +# homeassistant.components.pulseaudio_loopback +pulsectl==20.2.4 + # homeassistant.components.androidtv pure-python-adb==0.2.2.dev0