Bump aioesphomeapi and adjust handle_stop (#125907)

* Bump aioesphomeapi and adjust handle_stop

* Stop audio stream too

* Update homeassistant/components/esphome/assist_satellite.py

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>

---------

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
Michael Hansen 2024-09-13 15:05:11 -05:00 committed by GitHub
parent 94916ebbd1
commit cabaf37437
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 100 additions and 13 deletions

View File

@ -350,9 +350,12 @@ class EsphomeAssistSatellite(
"""Handle incoming audio chunk from API."""
self._audio_queue.put_nowait(data)
async def handle_pipeline_stop(self) -> None:
async def handle_pipeline_stop(self, abort: bool) -> None:
"""Handle request for pipeline to stop."""
self._stop_pipeline()
if abort:
self._abort_pipeline()
else:
self._stop_pipeline()
def handle_pipeline_finished(self) -> None:
"""Handle when pipeline has finished running."""
@ -466,10 +469,17 @@ class EsphomeAssistSatellite(
yield chunk
def _stop_pipeline(self) -> None:
"""Request pipeline to be stopped."""
"""Request pipeline to be stopped by ending the audio stream and continue processing."""
self._audio_queue.put_nowait(None)
_LOGGER.debug("Requested pipeline stop")
def _abort_pipeline(self) -> None:
"""Request pipeline to be aborted (no further processing)."""
_LOGGER.debug("Requested pipeline abort")
self._audio_queue.put_nowait(None)
if self._pipeline_task is not None:
self._pipeline_task.cancel()
async def _start_udp_server(self) -> int:
"""Start a UDP server on a random free port."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

View File

@ -17,7 +17,7 @@
"mqtt": ["esphome/discover/#"],
"quality_scale": "platinum",
"requirements": [
"aioesphomeapi==25.4.0",
"aioesphomeapi==26.0.0",
"esphome-dashboard-api==1.2.3",
"bleak-esphome==1.0.0"
],

View File

@ -240,7 +240,7 @@ aioelectricitymaps==0.4.0
aioemonitor==1.0.5
# homeassistant.components.esphome
aioesphomeapi==25.4.0
aioesphomeapi==26.0.0
# homeassistant.components.flo
aioflo==2021.11.0

View File

@ -228,7 +228,7 @@ aioelectricitymaps==0.4.0
aioemonitor==1.0.5
# homeassistant.components.esphome
aioesphomeapi==25.4.0
aioesphomeapi==26.0.0
# homeassistant.components.flo
aioflo==2021.11.0

View File

@ -205,7 +205,7 @@ class MockESPHomeDevice:
Coroutine[Any, Any, int | None],
]
self.voice_assistant_handle_stop_callback: Callable[
[], Coroutine[Any, Any, None]
[bool], Coroutine[Any, Any, None]
]
self.voice_assistant_handle_audio_callback: (
Callable[
@ -287,7 +287,7 @@ class MockESPHomeDevice:
[str, int, VoiceAssistantAudioSettings, str | None],
Coroutine[Any, Any, int | None],
],
handle_stop: Callable[[], Coroutine[Any, Any, None]],
handle_stop: Callable[[bool], Coroutine[Any, Any, None]],
handle_audio: (
Callable[
[bytes],
@ -313,9 +313,9 @@ class MockESPHomeDevice:
conversation_id, flags, settings, wake_word_phrase
)
async def mock_voice_assistant_handle_stop(self) -> None:
async def mock_voice_assistant_handle_stop(self, abort: bool) -> None:
"""Mock voice assistant handle stop."""
await self.voice_assistant_handle_stop_callback()
await self.voice_assistant_handle_stop_callback(abort)
async def mock_voice_assistant_handle_audio(self, audio: bytes) -> None:
"""Mock voice assistant handle audio."""
@ -394,7 +394,7 @@ async def _mock_generic_device_entry(
[str, int, VoiceAssistantAudioSettings, str | None],
Coroutine[Any, Any, int | None],
],
handle_stop: Callable[[], Coroutine[Any, Any, None]],
handle_stop: Callable[[bool], Coroutine[Any, Any, None]],
handle_audio: (
Callable[
[bytes],

View File

@ -368,7 +368,7 @@ async def test_pipeline_api_audio(
)
mock_tts_streaming_task.cancel.assert_called_once()
await satellite.handle_audio(b"test-mic")
await satellite.handle_pipeline_stop()
await satellite.handle_pipeline_stop(abort=False)
await pipeline_finished.wait()
await tts_finished.wait()
@ -563,7 +563,7 @@ async def test_pipeline_udp_audio(
# Wait for audio chunk to be delivered
await mic_audio_event.wait()
await satellite.handle_pipeline_stop()
await satellite.handle_pipeline_stop(abort=False)
await pipeline_finished.wait()
await tts_finished.wait()
@ -1073,3 +1073,80 @@ async def test_satellite_unloaded_on_disconnect(
state = hass.states.get(satellite.entity_id)
assert state is not None
assert state.state == STATE_UNAVAILABLE
async def test_pipeline_abort(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: Callable[
[APIClient, list[EntityInfo], list[UserService], list[EntityState]],
Awaitable[MockESPHomeDevice],
],
) -> None:
"""Test aborting a pipeline (no further processing)."""
mock_device: MockESPHomeDevice = await mock_esphome_device(
mock_client=mock_client,
entity_info=[],
user_service=[],
states=[],
device_info={
"voice_assistant_feature_flags": VoiceAssistantFeature.VOICE_ASSISTANT
| VoiceAssistantFeature.API_AUDIO
},
)
await hass.async_block_till_done()
satellite = get_satellite_entity(hass, mock_device.device_info.mac_address)
assert satellite is not None
chunks = []
chunk_received = asyncio.Event()
pipeline_aborted = asyncio.Event()
async def async_pipeline_from_audio_stream(*args, **kwargs):
stt_stream = kwargs["stt_stream"]
try:
async for chunk in stt_stream:
chunks.append(chunk)
chunk_received.set()
except asyncio.CancelledError:
# Aborting cancels the pipeline task
pipeline_aborted.set()
raise
pipeline_finished = asyncio.Event()
original_handle_pipeline_finished = satellite.handle_pipeline_finished
def handle_pipeline_finished():
original_handle_pipeline_finished()
pipeline_finished.set()
with (
patch(
"homeassistant.components.assist_satellite.entity.async_pipeline_from_audio_stream",
new=async_pipeline_from_audio_stream,
),
patch.object(satellite, "handle_pipeline_finished", handle_pipeline_finished),
):
async with asyncio.timeout(1):
await satellite.handle_pipeline_start(
conversation_id="",
flags=VoiceAssistantCommandFlag(0), # stt
audio_settings=VoiceAssistantAudioSettings(),
wake_word_phrase="",
)
await satellite.handle_audio(b"before-abort")
await chunk_received.wait()
# Abort the pipeline, no further processing
await satellite.handle_pipeline_stop(abort=True)
await pipeline_aborted.wait()
# This chunk should not make it into the STT stream
await satellite.handle_audio(b"after-abort")
await pipeline_finished.wait()
# Only first chunk
assert chunks == [b"before-abort"]