diff --git a/CODEOWNERS b/CODEOWNERS index 752bbb31460..89b0cf16af0 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1678,8 +1678,8 @@ build.json @home-assistant/supervisor /tests/components/vlc_telnet/ @rodripf @MartinHjelmare /homeassistant/components/vodafone_station/ @paoloantinori @chemelli74 /tests/components/vodafone_station/ @paoloantinori @chemelli74 -/homeassistant/components/voip/ @balloob @synesthesiam -/tests/components/voip/ @balloob @synesthesiam +/homeassistant/components/voip/ @balloob @synesthesiam @jaminh +/tests/components/voip/ @balloob @synesthesiam @jaminh /homeassistant/components/volumio/ @OnFreund /tests/components/volumio/ @OnFreund /homeassistant/components/volvooncall/ @molobrakos diff --git a/homeassistant/components/voip/assist_satellite.py b/homeassistant/components/voip/assist_satellite.py index a2364200ce2..7b34d7a11ba 100644 --- a/homeassistant/components/voip/assist_satellite.py +++ b/homeassistant/components/voip/assist_satellite.py @@ -51,9 +51,9 @@ if TYPE_CHECKING: _LOGGER = logging.getLogger(__name__) _PIPELINE_TIMEOUT_SEC: Final = 30 +_HANGUP_SEC: Final = 0.5 _ANNOUNCEMENT_BEFORE_DELAY: Final = 0.5 _ANNOUNCEMENT_AFTER_DELAY: Final = 1.0 -_ANNOUNCEMENT_HANGUP_SEC: Final = 0.5 _ANNOUNCEMENT_RING_TIMEOUT: Final = 30 @@ -132,9 +132,10 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol self._processing_tone_done = asyncio.Event() self._announcement: AssistSatelliteAnnouncement | None = None - self._announcement_future: asyncio.Future[Any] = asyncio.Future() self._announcment_start_time: float = 0.0 - self._check_announcement_ended_task: asyncio.Task | None = None + self._check_announcement_pickup_task: asyncio.Task | None = None + self._check_hangup_task: asyncio.Task | None = None + self._call_end_future: asyncio.Future[Any] = asyncio.Future() self._last_chunk_time: float | None = None self._rtp_port: int | None = None self._run_pipeline_after_announce: bool = False @@ -233,7 +234,7 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol translation_key="non_tts_announcement", ) - self._announcement_future = asyncio.Future() + self._call_end_future = asyncio.Future() self._run_pipeline_after_announce = run_pipeline_after if self._rtp_port is None: @@ -274,53 +275,77 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol rtp_port=self._rtp_port, ) - # Check if caller hung up or didn't pick up - self._check_announcement_ended_task = ( + # Check if caller didn't pick up + self._check_announcement_pickup_task = ( self.config_entry.async_create_background_task( self.hass, - self._check_announcement_ended(), - "voip_announcement_ended", + self._check_announcement_pickup(), + "voip_announcement_pickup", ) ) try: - await self._announcement_future + await self._call_end_future except TimeoutError: # Stop ringing + _LOGGER.debug("Caller did not pick up in time") sip_protocol.cancel_call(call_info) raise - async def _check_announcement_ended(self) -> None: + async def _check_announcement_pickup(self) -> None: """Continuously checks if an audio chunk was received within a time limit. - If not, the caller is presumed to have hung up and the announcement is ended. + If not, the caller is presumed to have not picked up the phone and the announcement is ended. """ - while self._announcement is not None: + while True: current_time = time.monotonic() if (self._last_chunk_time is None) and ( (current_time - self._announcment_start_time) > _ANNOUNCEMENT_RING_TIMEOUT ): # Ring timeout + _LOGGER.debug("Ring timeout") self._announcement = None - self._check_announcement_ended_task = None - self._announcement_future.set_exception( + self._check_announcement_pickup_task = None + self._call_end_future.set_exception( TimeoutError("User did not pick up in time") ) _LOGGER.debug("Timed out waiting for the user to pick up the phone") break - - if (self._last_chunk_time is not None) and ( - (current_time - self._last_chunk_time) > _ANNOUNCEMENT_HANGUP_SEC - ): - # Caller hung up - self._announcement = None - self._announcement_future.set_result(None) - self._check_announcement_ended_task = None - _LOGGER.debug("Announcement ended") + if self._last_chunk_time is not None: + _LOGGER.debug("Picked up the phone") + self._check_announcement_pickup_task = None break - await asyncio.sleep(_ANNOUNCEMENT_HANGUP_SEC / 2) + await asyncio.sleep(_HANGUP_SEC / 2) + + async def _check_hangup(self) -> None: + """Continuously checks if an audio chunk was received within a time limit. + + If not, the caller is presumed to have hung up and the call is ended. + """ + try: + while True: + current_time = time.monotonic() + if (self._last_chunk_time is not None) and ( + (current_time - self._last_chunk_time) > _HANGUP_SEC + ): + # Caller hung up + _LOGGER.debug("Hang up") + self._announcement = None + if self._run_pipeline_task is not None: + _LOGGER.debug("Cancelling running pipeline") + self._run_pipeline_task.cancel() + self._call_end_future.set_result(None) + self.disconnect() + break + + await asyncio.sleep(_HANGUP_SEC / 2) + except asyncio.CancelledError: + # Don't swallow cancellation + if (current_task := asyncio.current_task()) and current_task.cancelling(): + raise + _LOGGER.debug("Check hangup cancelled") async def async_start_conversation( self, start_announcement: AssistSatelliteAnnouncement @@ -332,6 +357,24 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol # VoIP # ------------------------------------------------------------------------- + def disconnect(self): + """Server disconnected.""" + super().disconnect() + if self._check_hangup_task is not None: + self._check_hangup_task.cancel() + self._check_hangup_task = None + + def connection_made(self, transport): + """Server is ready.""" + super().connection_made(transport) + self._last_chunk_time = time.monotonic() + # Check if caller hung up + self._check_hangup_task = self.config_entry.async_create_background_task( + self.hass, + self._check_hangup(), + "voip_hangup", + ) + def on_chunk(self, audio_bytes: bytes) -> None: """Handle raw audio chunk.""" self._last_chunk_time = time.monotonic() @@ -368,13 +411,22 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol self.voip_device.set_is_active(True) async def stt_stream(): + retry: bool = True while True: - async with asyncio.timeout(self._audio_chunk_timeout): - chunk = await self._audio_queue.get() - if not chunk: - break + try: + async with asyncio.timeout(self._audio_chunk_timeout): + chunk = await self._audio_queue.get() + if not chunk: + _LOGGER.debug("STT stream got None") + break yield chunk + except TimeoutError: + _LOGGER.debug("STT Stream timed out") + if not retry: + _LOGGER.debug("No more retries, ending STT stream") + break + retry = False # Play listening tone at the start of each cycle await self._play_tone(Tones.LISTENING, silence_before=0.2) @@ -385,6 +437,7 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol ) if self._pipeline_had_error: + _LOGGER.debug("Pipeline error") self._pipeline_had_error = False await self._play_tone(Tones.ERROR) else: @@ -394,7 +447,14 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol # length of the TTS audio. await self._tts_done.wait() except TimeoutError: + # This shouldn't happen anymore, we are detecting hang ups with a separate task + _LOGGER.exception("Timeout error") self.disconnect() # caller hung up + except asyncio.CancelledError: + _LOGGER.debug("Pipeline cancelled") + # Don't swallow cancellation + if (current_task := asyncio.current_task()) and current_task.cancelling(): + raise finally: # Stop audio stream await self._audio_queue.put(None) @@ -433,8 +493,8 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol if self._run_pipeline_after_announce: # Clear announcement to allow pipeline to run + _LOGGER.debug("Clearing announcement") self._announcement = None - self._announcement_future.set_result(None) def _clear_audio_queue(self) -> None: """Ensure audio queue is empty.""" @@ -463,6 +523,7 @@ class VoipAssistSatellite(VoIPEntity, AssistSatelliteEntity, RtpDatagramProtocol ) else: # Empty TTS response + _LOGGER.debug("Empty TTS response") self._tts_done.set() elif event.type == PipelineEventType.ERROR: # Play error tone instead of wait for TTS when pipeline is finished. diff --git a/homeassistant/components/voip/manifest.json b/homeassistant/components/voip/manifest.json index dfd397fde14..09e1f112699 100644 --- a/homeassistant/components/voip/manifest.json +++ b/homeassistant/components/voip/manifest.json @@ -1,7 +1,7 @@ { "domain": "voip", "name": "Voice over IP", - "codeowners": ["@balloob", "@synesthesiam"], + "codeowners": ["@balloob", "@synesthesiam", "@jaminh"], "config_flow": true, "dependencies": ["assist_pipeline", "assist_satellite", "intent", "network"], "documentation": "https://www.home-assistant.io/integrations/voip", diff --git a/tests/components/voip/test_voip.py b/tests/components/voip/test_voip.py index 345f0399645..65567c8e1d1 100644 --- a/tests/components/voip/test_voip.py +++ b/tests/components/voip/test_voip.py @@ -335,9 +335,8 @@ async def test_pipeline( patch.object(satellite, "tts_response_finished", tts_response_finished), ): satellite._tones = Tones(0) - satellite.transport = Mock() + satellite.connection_made(Mock()) - satellite.connection_made(satellite.transport) assert satellite.state == AssistSatelliteState.IDLE # Ensure audio queue is cleared before pipeline starts @@ -473,7 +472,7 @@ async def test_tts_timeout( for tone in Tones: satellite._tone_bytes[tone] = tone_bytes - satellite.transport = Mock() + satellite.connection_made(Mock()) satellite.send_audio = Mock() original_send_tts = satellite._send_tts @@ -511,6 +510,7 @@ async def test_tts_wrong_extension( assert await async_setup_component(hass, "voip", {}) satellite = async_get_satellite_entity(hass, voip.DOMAIN, voip_device.voip_id) + satellite.addr = ("192.168.1.1", 12345) assert isinstance(satellite, VoipAssistSatellite) done = asyncio.Event() @@ -559,8 +559,6 @@ async def test_tts_wrong_extension( "homeassistant.components.assist_satellite.entity.async_pipeline_from_audio_stream", new=async_pipeline_from_audio_stream, ): - satellite.transport = Mock() - original_send_tts = satellite._send_tts async def send_tts(*args, **kwargs): @@ -572,6 +570,8 @@ async def test_tts_wrong_extension( satellite._send_tts = AsyncMock(side_effect=send_tts) # type: ignore[method-assign] + satellite.connection_made(Mock()) + # silence satellite.on_chunk(bytes(_ONE_SECOND)) @@ -579,10 +579,18 @@ async def test_tts_wrong_extension( satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to exhaust the audio stream - async with asyncio.timeout(1): + async with asyncio.timeout(3): await done.wait() @@ -595,6 +603,7 @@ async def test_tts_wrong_wav_format( assert await async_setup_component(hass, "voip", {}) satellite = async_get_satellite_entity(hass, voip.DOMAIN, voip_device.voip_id) + satellite.addr = ("192.168.1.1", 12345) assert isinstance(satellite, VoipAssistSatellite) done = asyncio.Event() @@ -643,8 +652,6 @@ async def test_tts_wrong_wav_format( "homeassistant.components.assist_satellite.entity.async_pipeline_from_audio_stream", new=async_pipeline_from_audio_stream, ): - satellite.transport = Mock() - original_send_tts = satellite._send_tts async def send_tts(*args, **kwargs): @@ -656,6 +663,8 @@ async def test_tts_wrong_wav_format( satellite._send_tts = AsyncMock(side_effect=send_tts) # type: ignore[method-assign] + satellite.connection_made(Mock()) + # silence satellite.on_chunk(bytes(_ONE_SECOND)) @@ -663,10 +672,18 @@ async def test_tts_wrong_wav_format( satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to exhaust the audio stream - async with asyncio.timeout(1): + async with asyncio.timeout(3): await done.wait() @@ -679,6 +696,7 @@ async def test_empty_tts_output( assert await async_setup_component(hass, "voip", {}) satellite = async_get_satellite_entity(hass, voip.DOMAIN, voip_device.voip_id) + satellite.addr = ("192.168.1.1", 12345) assert isinstance(satellite, VoipAssistSatellite) async def async_pipeline_from_audio_stream(*args, **kwargs): @@ -728,7 +746,7 @@ async def test_empty_tts_output( "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() + satellite.connection_made(Mock()) # silence satellite.on_chunk(bytes(_ONE_SECOND)) @@ -737,10 +755,18 @@ async def test_empty_tts_output( satellite.on_chunk(bytes([255] * _ONE_SECOND * 2)) # silence (assumes relaxed VAD sensitivity) - satellite.on_chunk(bytes(_ONE_SECOND * 4)) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) # Wait for mock pipeline to finish - async with asyncio.timeout(1): + async with asyncio.timeout(2): await satellite._tts_done.wait() mock_send_tts.assert_not_called() @@ -785,7 +811,7 @@ async def test_pipeline_error( ), ): satellite._tones = Tones.ERROR - satellite.transport = Mock() + satellite.connection_made(Mock()) satellite._async_send_audio = AsyncMock(side_effect=async_send_audio) # type: ignore[method-assign] satellite.on_chunk(bytes(_ONE_SECOND)) @@ -845,16 +871,20 @@ async def test_announce( "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() announce_task = hass.async_create_background_task( satellite.async_announce(announcement), "voip_announce" ) await asyncio.sleep(0) + satellite.connection_made(Mock()) mock_protocol.outgoing_call.assert_called_once() # Trigger announcement satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + async with asyncio.timeout(2): await announce_task mock_send_tts.assert_called_once_with( @@ -897,11 +927,11 @@ async def test_voip_id_is_ip_address( "homeassistant.components.voip.assist_satellite.VoipAssistSatellite._send_tts", ) as mock_send_tts, ): - satellite.transport = Mock() announce_task = hass.async_create_background_task( satellite.async_announce(announcement), "voip_announce" ) await asyncio.sleep(0) + satellite.connection_made(Mock()) mock_protocol.outgoing_call.assert_called_once() assert ( mock_protocol.outgoing_call.call_args.kwargs["destination"].host @@ -910,7 +940,11 @@ async def test_voip_id_is_ip_address( # Trigger announcement satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + async with asyncio.timeout(2): await announce_task mock_send_tts.assert_called_once_with( @@ -955,7 +989,7 @@ async def test_announce_timeout( 0.01, ), ): - satellite.transport = Mock() + satellite.connection_made(Mock()) with pytest.raises(TimeoutError): await satellite.async_announce(announcement) @@ -1042,7 +1076,7 @@ async def test_start_conversation( new=async_pipeline_from_audio_stream, ), ): - satellite.transport = Mock() + satellite.connection_made(Mock()) conversation_task = hass.async_create_background_task( satellite.async_start_conversation(announcement), "voip_start_conversation" ) @@ -1051,16 +1085,20 @@ async def test_start_conversation( # Trigger announcement and wait for it to finish satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + async with asyncio.timeout(2): await tts_sent.wait() - tts_sent.clear() - # Trigger pipeline satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): - # Wait for TTS - await tts_sent.wait() + await asyncio.sleep(0.2) + satellite.on_chunk(bytes(_ONE_SECOND)) + await asyncio.sleep(3) + async with asyncio.timeout(3): + # Wait for Conversation end await conversation_task @@ -1073,21 +1111,8 @@ async def test_start_conversation_user_doesnt_pick_up( """Test start conversation when the user doesn't pick up.""" assert await async_setup_component(hass, "voip", {}) - pipeline = assist_pipeline.Pipeline( - conversation_engine="test engine", - conversation_language="en", - language="en", - name="test pipeline", - stt_engine="test stt", - stt_language="en", - tts_engine="test tts", - tts_language="en", - tts_voice=None, - wake_word_entity=None, - wake_word_id=None, - ) - satellite = async_get_satellite_entity(hass, voip.DOMAIN, voip_device.voip_id) + satellite.addr = ("192.168.1.1", 12345) assert isinstance(satellite, VoipAssistSatellite) assert ( satellite.supported_features @@ -1098,62 +1123,22 @@ async def test_start_conversation_user_doesnt_pick_up( mock_protocol: AsyncMock = hass.data[DOMAIN].protocol mock_protocol.outgoing_call = Mock() - pipeline_started = asyncio.Event() - - async def async_pipeline_from_audio_stream( - hass: HomeAssistant, - context: Context, - *args, - conversation_extra_system_prompt: str | None = None, - **kwargs, - ): - # System prompt should be not be set due to timeout (user not picking up) - assert conversation_extra_system_prompt is None - - pipeline_started.set() + announcement = assist_satellite.AssistSatelliteAnnouncement( + message="test announcement", + media_id=_MEDIA_ID, + tts_token="test-token", + original_media_id=_MEDIA_ID, + media_id_source="tts", + ) + # Very short timeout which will trigger because we don't send any audio in with ( patch( - "homeassistant.components.assist_satellite.entity.async_get_pipeline", - return_value=pipeline, - ), - patch( - "homeassistant.components.voip.assist_satellite.VoipAssistSatellite.async_start_conversation", - side_effect=TimeoutError, - ), - patch( - "homeassistant.components.assist_satellite.entity.async_pipeline_from_audio_stream", - new=async_pipeline_from_audio_stream, - ), - patch( - "homeassistant.components.tts.generate_media_source_id", - return_value="media-source://bla", - ), - patch( - "homeassistant.components.tts.async_resolve_engine", - return_value="test tts", - ), - patch( - "homeassistant.components.tts.async_create_stream", - return_value=MockResultStream(hass, "wav", b""), + "homeassistant.components.voip.assist_satellite._ANNOUNCEMENT_RING_TIMEOUT", + 0.1, ), ): - satellite.transport = Mock() + satellite.connection_made(Mock()) - # Error should clear system prompt with pytest.raises(TimeoutError): - await hass.services.async_call( - assist_satellite.DOMAIN, - "start_conversation", - { - "entity_id": satellite.entity_id, - "start_message": "test announcement", - "extra_system_prompt": "test prompt", - }, - blocking=True, - ) - - # Trigger a pipeline so we can check if the system prompt was cleared - satellite.on_chunk(bytes(_ONE_SECOND)) - async with asyncio.timeout(1): - await pipeline_started.wait() + await satellite.async_start_conversation(announcement)