diff --git a/tests/components/stream/conftest.py b/tests/components/stream/conftest.py index 1b017667ee6..ead2018b528 100644 --- a/tests/components/stream/conftest.py +++ b/tests/components/stream/conftest.py @@ -32,6 +32,7 @@ class WorkerSync: def resume(self): """Allow the worker thread to finalize the stream.""" + logging.debug("waking blocked worker") self._event.set() def blocking_finish(self, stream: Stream): diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index b554ee6b20a..ab0c21efdfb 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -20,6 +20,8 @@ from tests.components.stream.common import generate_h264_video STREAM_SOURCE = "some-stream-source" SEQUENCE_BYTES = io.BytesIO(b"some-bytes") DURATION = 10 +TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout +MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever class HlsClient: @@ -187,39 +189,6 @@ async def test_stream_timeout_after_stop(hass, hass_client, stream_worker_sync): await hass.async_block_till_done() -async def test_stream_ended(hass, stream_worker_sync): - """Test hls stream packets ended.""" - await async_setup_component(hass, "stream", {"stream": {}}) - - stream_worker_sync.pause() - - # Setup demo HLS track - source = generate_h264_video() - stream = create_stream(hass, source) - track = stream.add_provider("hls") - - # Request stream - stream.add_provider("hls") - stream.start() - stream.endpoint_url("hls") - - # Run it dead - while True: - segment = await track.recv() - if segment is None: - break - segments = segment.sequence - # Allow worker to finalize once enough of the stream is been consumed - if segments > 1: - stream_worker_sync.resume() - - assert segments > 1 - assert not track.get_segment() - - # Stop stream, if it hasn't quit already - stream.stop() - - async def test_stream_keepalive(hass): """Test hls stream retries the stream when keepalive=True.""" await async_setup_component(hass, "stream", {"stream": {}}) diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 48fe48d3337..2b44c16243b 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -1,10 +1,13 @@ """The tests for hls streams.""" +import asyncio from datetime import timedelta import logging import os import threading +from typing import Deque from unittest.mock import patch +import async_timeout import av import pytest @@ -18,7 +21,8 @@ import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed from tests.components.stream.common import generate_h264_video -TEST_TIMEOUT = 10 +TEST_TIMEOUT = 7.0 # Lower than 9s home assistant timeout +MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever class SaveRecordWorkerSync: @@ -32,23 +36,33 @@ class SaveRecordWorkerSync: def __init__(self): """Initialize SaveRecordWorkerSync.""" self.reset() + self._segments = None - def recorder_save_worker(self, *args, **kwargs): + def recorder_save_worker(self, file_out: str, segments: Deque[Segment]): """Mock method for patch.""" logging.debug("recorder_save_worker thread started") assert self._save_thread is None + self._segments = segments self._save_thread = threading.current_thread() self._save_event.set() - def join(self): + async def get_segments(self): + """Return the recorded video segments.""" + with async_timeout.timeout(TEST_TIMEOUT): + await self._save_event.wait() + return self._segments + + async def join(self): """Verify save worker was invoked and block on shutdown.""" - assert self._save_event.wait(timeout=TEST_TIMEOUT) - self._save_thread.join() + with async_timeout.timeout(TEST_TIMEOUT): + await self._save_event.wait() + self._save_thread.join(timeout=TEST_TIMEOUT) + assert not self._save_thread.is_alive() def reset(self): """Reset callback state for reuse in tests.""" self._save_thread = None - self._save_event = threading.Event() + self._save_event = asyncio.Event() @pytest.fixture() @@ -63,7 +77,7 @@ def record_worker_sync(hass): yield sync -async def test_record_stream(hass, hass_client, stream_worker_sync, record_worker_sync): +async def test_record_stream(hass, hass_client, record_worker_sync): """ Test record stream. @@ -73,29 +87,21 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke """ await async_setup_component(hass, "stream", {"stream": {}}) - stream_worker_sync.pause() - # Setup demo track source = generate_h264_video() stream = create_stream(hass, source) with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - recorder = stream.add_provider("recorder") - while True: - segment = await recorder.recv() - if not segment: - break - segments = segment.sequence - if segments > 1: - stream_worker_sync.resume() - - stream.stop() - assert segments > 1 + # After stream decoding finishes, the record worker thread starts + segments = await record_worker_sync.get_segments() + assert len(segments) >= 1 # Verify that the save worker was invoked, then block until its # thread completes and is shutdown completely to avoid thread leaks. - record_worker_sync.join() + await record_worker_sync.join() + + stream.stop() async def test_record_lookback( @@ -250,4 +256,4 @@ async def test_record_stream_audio( # Verify that the save worker was invoked, then block until its # thread completes and is shutdown completely to avoid thread leaks. - record_worker_sync.join() + await record_worker_sync.join()