diff --git a/homeassistant/components/voip/manifest.json b/homeassistant/components/voip/manifest.json index 3c154936209..b9439ee682c 100644 --- a/homeassistant/components/voip/manifest.json +++ b/homeassistant/components/voip/manifest.json @@ -7,5 +7,5 @@ "documentation": "https://www.home-assistant.io/integrations/voip", "iot_class": "local_push", "quality_scale": "internal", - "requirements": ["voip-utils==0.0.2"] + "requirements": ["voip-utils==0.0.5"] } diff --git a/homeassistant/components/voip/not_configured.raw b/homeassistant/components/voip/not_configured.raw new file mode 100644 index 00000000000..0c12f876ec9 Binary files /dev/null and b/homeassistant/components/voip/not_configured.raw differ diff --git a/homeassistant/components/voip/tone.raw b/homeassistant/components/voip/tone.raw new file mode 100644 index 00000000000..175e072a27b Binary files /dev/null and b/homeassistant/components/voip/tone.raw differ diff --git a/homeassistant/components/voip/voip.py b/homeassistant/components/voip/voip.py index 38211ad96b6..6dee1a9153e 100644 --- a/homeassistant/components/voip/voip.py +++ b/homeassistant/components/voip/voip.py @@ -3,8 +3,10 @@ from __future__ import annotations import asyncio from collections import deque -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, MutableSequence, Sequence +from functools import partial import logging +from pathlib import Path import time from typing import TYPE_CHECKING @@ -22,6 +24,7 @@ from homeassistant.components.assist_pipeline import ( from homeassistant.components.assist_pipeline.vad import VoiceCommandSegmenter from homeassistant.const import __version__ from homeassistant.core import Context, HomeAssistant +from homeassistant.util.ulid import ulid from .const import DOMAIN @@ -29,6 +32,9 @@ if TYPE_CHECKING: from .devices import VoIPDevice, VoIPDevices _BUFFERED_CHUNKS_BEFORE_SPEECH = 100 # ~2 seconds +_TONE_DELAY = 0.2 # seconds before playing tone +_MESSAGE_DELAY = 1.0 # seconds before playing "not configured" message +_LOOP_DELAY = 2.0 # seconds before replaying not-configured message _LOGGER = logging.getLogger(__name__) @@ -44,11 +50,14 @@ class HassVoipDatagramProtocol(VoipDatagramProtocol): session_name="voip_hass", version=__version__, ), - protocol_factory=lambda call_info: PipelineRtpDatagramProtocol( + valid_protocol_factory=lambda call_info: PipelineRtpDatagramProtocol( hass, hass.config.language, devices.async_get_or_create(call_info), ), + invalid_protocol_factory=lambda call_info: NotConfiguredRtpDatagramProtocol( + hass, + ), ) self.hass = hass self.devices = devices @@ -69,6 +78,7 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): voip_device: VoIPDevice, pipeline_timeout: float = 30.0, audio_timeout: float = 2.0, + listening_tone_enabled: bool = True, ) -> None: """Set up pipeline RTP server.""" # STT expects 16Khz mono with 16-bit samples @@ -80,11 +90,14 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): self.pipeline: Pipeline | None = None self.pipeline_timeout = pipeline_timeout self.audio_timeout = audio_timeout + self.listening_tone_enabled = listening_tone_enabled self._audio_queue: asyncio.Queue[bytes] = asyncio.Queue() self._context = Context() self._conversation_id: str | None = None self._pipeline_task: asyncio.Task | None = None + self._session_id: str | None = None + self._tone_bytes: bytes | None = None def connection_made(self, transport): """Server is ready.""" @@ -113,23 +126,42 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): self, ) -> None: """Forward audio to pipeline STT and handle TTS.""" - _LOGGER.debug("Starting pipeline") - - async def stt_stream(): - try: - async for chunk in self._segment_audio(): - yield chunk - except asyncio.TimeoutError: - # Expected after caller hangs up - _LOGGER.debug("Audio timeout") - - if self.transport is not None: - self.transport.close() - self.transport = None - finally: - self._clear_audio_queue() + if self._session_id is None: + self._session_id = ulid() + if self.listening_tone_enabled: + await self._play_listening_tone() try: + # Wait for speech before starting pipeline + segmenter = VoiceCommandSegmenter() + chunk_buffer: deque[bytes] = deque( + maxlen=_BUFFERED_CHUNKS_BEFORE_SPEECH, + ) + speech_detected = await self._wait_for_speech( + segmenter, + chunk_buffer, + ) + if not speech_detected: + _LOGGER.debug("No speech detected") + return + + _LOGGER.debug("Starting pipeline") + + async def stt_stream(): + try: + async for chunk in self._segment_audio( + segmenter, + chunk_buffer, + ): + yield chunk + except asyncio.TimeoutError: + # Expected after caller hangs up + _LOGGER.debug("Audio timeout") + self._session_id = None + self.disconnect() + finally: + self._clear_audio_queue() + # Run pipeline with a timeout async with async_timeout.timeout(self.pipeline_timeout): await async_pipeline_from_audio_stream( @@ -155,17 +187,48 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): except asyncio.TimeoutError: # Expected after caller hangs up _LOGGER.debug("Pipeline timeout") - - if self.transport is not None: - self.transport.close() - self.transport = None + self._session_id = None + self.disconnect() finally: # Allow pipeline to run again self._pipeline_task = None - async def _segment_audio(self) -> AsyncIterable[bytes]: - segmenter = VoiceCommandSegmenter() - chunk_buffer: deque[bytes] = deque(maxlen=_BUFFERED_CHUNKS_BEFORE_SPEECH) + async def _wait_for_speech( + self, + segmenter: VoiceCommandSegmenter, + chunk_buffer: MutableSequence[bytes], + ): + """Buffer audio chunks until speech is detected. + + Returns True if speech was detected, False otherwise. + """ + # Timeout if no audio comes in for a while. + # This means the caller hung up. + async with async_timeout.timeout(self.audio_timeout): + chunk = await self._audio_queue.get() + + while chunk: + segmenter.process(chunk) + if segmenter.in_command: + return True + + # Buffer until command starts + chunk_buffer.append(chunk) + + async with async_timeout.timeout(self.audio_timeout): + chunk = await self._audio_queue.get() + + return False + + async def _segment_audio( + self, + segmenter: VoiceCommandSegmenter, + chunk_buffer: Sequence[bytes], + ) -> AsyncIterable[bytes]: + """Yield audio chunks until voice command has finished.""" + # Buffered chunks first + for buffered_chunk in chunk_buffer: + yield buffered_chunk # Timeout if no audio comes in for a while. # This means the caller hung up. @@ -177,18 +240,7 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): # Voice command is finished break - if segmenter.in_command: - if chunk_buffer: - # Release audio in buffer first - for buffered_chunk in chunk_buffer: - yield buffered_chunk - - chunk_buffer.clear() - - yield chunk - else: - # Buffer until command starts - chunk_buffer.append(chunk) + yield chunk async with async_timeout.timeout(self.audio_timeout): chunk = await self._audio_queue.get() @@ -225,4 +277,74 @@ class PipelineRtpDatagramProtocol(RtpDatagramProtocol): _LOGGER.debug("Sending %s byte(s) of audio", len(audio_bytes)) # Assume TTS audio is 16Khz 16-bit mono - await self.send_audio(audio_bytes, rate=16000, width=2, channels=1) + await self.hass.async_add_executor_job( + partial(self.send_audio, audio_bytes, rate=16000, width=2, channels=1) + ) + + async def _play_listening_tone(self) -> None: + """Play a tone to indicate that Home Assistant is listening.""" + if self._tone_bytes is None: + # Do I/O in executor + self._tone_bytes = await self.hass.async_add_executor_job( + self._load_tone, + ) + + await self.hass.async_add_executor_job( + partial( + self.send_audio, + self._tone_bytes, + rate=16000, + width=2, + channels=1, + silence_before=_TONE_DELAY, + ) + ) + + def _load_tone(self) -> bytes: + """Load raw tone audio (16Khz, 16-bit mono).""" + return (Path(__file__).parent / "tone.raw").read_bytes() + + +class NotConfiguredRtpDatagramProtocol(RtpDatagramProtocol): + """Plays audio on a loop to inform the user to configure the phone in Home Assistant.""" + + def __init__(self, hass: HomeAssistant) -> None: + """Set up RTP server.""" + super().__init__(rate=16000, width=2, channels=1) + self.hass = hass + self._audio_task: asyncio.Task | None = None + self._audio_bytes: bytes | None = None + + def on_chunk(self, audio_bytes: bytes) -> None: + """Handle raw audio chunk.""" + if self.transport is None: + return + + if self._audio_bytes is None: + # 16Khz, 16-bit mono audio message + self._audio_bytes = ( + Path(__file__).parent / "not_configured.raw" + ).read_bytes() + + if self._audio_task is None: + self._audio_task = self.hass.async_create_background_task( + self._play_message(), + "voip_not_connected", + ) + + async def _play_message(self) -> None: + await self.hass.async_add_executor_job( + partial( + self.send_audio, + self._audio_bytes, + 16000, + 2, + 1, + silence_before=_MESSAGE_DELAY, + ) + ) + + await asyncio.sleep(_LOOP_DELAY) + + # Allow message to play again + self._audio_task = None diff --git a/requirements_all.txt b/requirements_all.txt index ee9f5d32f4f..88800303363 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -2591,7 +2591,7 @@ venstarcolortouch==0.19 vilfo-api-client==0.3.2 # homeassistant.components.voip -voip-utils==0.0.2 +voip-utils==0.0.5 # homeassistant.components.volkszaehler volkszaehler==0.4.0 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index e8c5cd07073..09e748968c3 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -1867,7 +1867,7 @@ venstarcolortouch==0.19 vilfo-api-client==0.3.2 # homeassistant.components.voip -voip-utils==0.0.2 +voip-utils==0.0.5 # homeassistant.components.volvooncall volvooncall==0.10.2 diff --git a/tests/components/voip/test_voip.py b/tests/components/voip/test_voip.py index 74eccdc4382..3a6b7bdc0c0 100644 --- a/tests/components/voip/test_voip.py +++ b/tests/components/voip/test_voip.py @@ -35,7 +35,6 @@ async def test_pipeline( async for _chunk in stt_stream: # Stream will end when VAD detects end of "speech" assert _chunk != bad_chunk - pass # Test empty data event_callback( @@ -84,14 +83,17 @@ async def test_pipeline( new=async_get_media_source_audio, ): rtp_protocol = voip.voip.PipelineRtpDatagramProtocol( - hass, hass.config.language, voip_device + hass, + hass.config.language, + voip_device, + listening_tone_enabled=False, ) rtp_protocol.transport = Mock() # Ensure audio queue is cleared before pipeline starts rtp_protocol._audio_queue.put_nowait(bad_chunk) - async def send_audio(*args, **kwargs): + def send_audio(*args, **kwargs): # Test finished successfully done.set() @@ -123,9 +125,16 @@ async def test_pipeline_timeout(hass: HomeAssistant, voip_device: VoIPDevice) -> with patch( "homeassistant.components.voip.voip.async_pipeline_from_audio_stream", new=async_pipeline_from_audio_stream, + ), patch( + "homeassistant.components.voip.voip.PipelineRtpDatagramProtocol._wait_for_speech", + return_value=True, ): rtp_protocol = voip.voip.PipelineRtpDatagramProtocol( - hass, hass.config.language, voip_device, pipeline_timeout=0.001 + hass, + hass.config.language, + voip_device, + pipeline_timeout=0.001, + listening_tone_enabled=False, ) transport = Mock(spec=["close"]) rtp_protocol.connection_made(transport) @@ -158,7 +167,11 @@ async def test_stt_stream_timeout(hass: HomeAssistant, voip_device: VoIPDevice) new=async_pipeline_from_audio_stream, ): rtp_protocol = voip.voip.PipelineRtpDatagramProtocol( - hass, hass.config.language, voip_device, audio_timeout=0.001 + hass, + hass.config.language, + voip_device, + audio_timeout=0.001, + listening_tone_enabled=False, ) transport = Mock(spec=["close"]) rtp_protocol.connection_made(transport)