diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index 420e7c654c5..cf923de85c2 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -1,4 +1,5 @@ """Provide functionality to record stream.""" +import logging import os import threading from typing import List @@ -9,6 +10,8 @@ from homeassistant.core import callback from .core import PROVIDERS, Segment, StreamOutput +_LOGGER = logging.getLogger(__name__) + @callback def async_setup_recorder(hass): @@ -109,6 +112,7 @@ class RecorderOutput(StreamOutput): def cleanup(self): """Write recording and clean up.""" + _LOGGER.debug("Starting recorder worker thread") thread = threading.Thread( name="recorder_save_worker", target=recorder_save_worker, diff --git a/tests/components/stream/conftest.py b/tests/components/stream/conftest.py new file mode 100644 index 00000000000..1b2f0645f9b --- /dev/null +++ b/tests/components/stream/conftest.py @@ -0,0 +1,60 @@ +"""Test fixtures for the stream component. + +The tests encode stream (as an h264 video), then load the stream and verify +that it is decoded properly. The background worker thread responsible for +decoding will decode the stream as fast as possible, and when completed +clears all output buffers. This can be a problem for the test that wishes +to retrieve and verify decoded segments. If the worker finishes first, there is +nothing for the test to verify. The solution is the WorkerSync class that +allows the tests to pause the worker thread before finalizing the stream +so that it can inspect the output. +""" + +import logging +import threading +from unittest.mock import patch + +import pytest + +from homeassistant.components.stream.core import Segment, StreamOutput + + +class WorkerSync: + """Test fixture that intercepts stream worker calls to StreamOutput.""" + + def __init__(self): + """Initialize WorkerSync.""" + self._event = None + self._put_original = StreamOutput.put + + def pause(self): + """Pause the worker before it finalizes the stream.""" + self._event = threading.Event() + + def resume(self): + """Allow the worker thread to finalize the stream.""" + self._event.set() + + def blocking_put(self, stream_output: StreamOutput, segment: Segment): + """Proxy StreamOutput.put, intercepted for test to pause worker.""" + if segment is None and self._event: + # Worker is ending the stream, which clears all output buffers. + # Block the worker thread until the test has a chance to verify + # the segments under test. + logging.error("blocking worker") + self._event.wait() + + # Forward to actual StreamOutput.put + self._put_original(stream_output, segment) + + +@pytest.fixture() +def stream_worker_sync(hass): + """Patch StreamOutput to allow test to synchronize worker stream end.""" + sync = WorkerSync() + with patch( + "homeassistant.components.stream.core.StreamOutput.put", + side_effect=sync.blocking_put, + autospec=True, + ): + yield sync diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index e19a3b96687..790222b1630 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -1,24 +1,11 @@ -"""The tests for hls streams. - -The tests encode stream (as an h264 video), then load the stream and verify -that it is decoded properly. The background worker thread responsible for -decoding will decode the stream as fast as possible, and when completed -clears all output buffers. This can be a problem for the test that wishes -to retrieve and verify decoded segments. If the worker finishes first, there is -nothing for the test to verify. The solution is the WorkerSync class that -allows the tests to pause the worker thread before finalizing the stream -so that it can inspect the output. -""" +"""The tests for hls streams.""" from datetime import timedelta -import threading from unittest.mock import patch from urllib.parse import urlparse import av -import pytest from homeassistant.components.stream import request_stream -from homeassistant.components.stream.core import Segment, StreamOutput from homeassistant.const import HTTP_NOT_FOUND from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util @@ -27,47 +14,7 @@ from tests.common import async_fire_time_changed from tests.components.stream.common import generate_h264_video, preload_stream -class WorkerSync: - """Test fixture that intercepts stream worker calls to StreamOutput.""" - - def __init__(self): - """Initialize WorkerSync.""" - self._event = None - self._put_original = StreamOutput.put - - def pause(self): - """Pause the worker before it finalizes the stream.""" - self._event = threading.Event() - - def resume(self): - """Allow the worker thread to finalize the stream.""" - self._event.set() - - def blocking_put(self, stream_output: StreamOutput, segment: Segment): - """Proxy StreamOutput.put, intercepted for test to pause worker.""" - if segment is None and self._event: - # Worker is ending the stream, which clears all output buffers. - # Block the worker thread until the test has a chance to verify - # the segments under test. - self._event.wait() - - # Forward to actual StreamOutput.put - self._put_original(stream_output, segment) - - -@pytest.fixture() -def worker_sync(hass): - """Patch StreamOutput to allow test to synchronize worker stream end.""" - sync = WorkerSync() - with patch( - "homeassistant.components.stream.core.StreamOutput.put", - side_effect=sync.blocking_put, - autospec=True, - ): - yield sync - - -async def test_hls_stream(hass, hass_client, worker_sync): +async def test_hls_stream(hass, hass_client, stream_worker_sync): """ Test hls stream. @@ -76,7 +23,7 @@ async def test_hls_stream(hass, hass_client, worker_sync): """ await async_setup_component(hass, "stream", {"stream": {}}) - worker_sync.pause() + stream_worker_sync.pause() # Setup demo HLS track source = generate_h264_video() @@ -107,7 +54,7 @@ async def test_hls_stream(hass, hass_client, worker_sync): segment_response = await http_client.get(segment_url) assert segment_response.status == 200 - worker_sync.resume() + stream_worker_sync.resume() # Stop stream, if it hasn't quit already stream.stop() @@ -117,11 +64,11 @@ async def test_hls_stream(hass, hass_client, worker_sync): assert fail_response.status == HTTP_NOT_FOUND -async def test_stream_timeout(hass, hass_client, worker_sync): +async def test_stream_timeout(hass, hass_client, stream_worker_sync): """Test hls stream timeout.""" await async_setup_component(hass, "stream", {"stream": {}}) - worker_sync.pause() + stream_worker_sync.pause() # Setup demo HLS track source = generate_h264_video() @@ -146,7 +93,7 @@ async def test_stream_timeout(hass, hass_client, worker_sync): playlist_response = await http_client.get(parsed_url.path) assert playlist_response.status == 200 - worker_sync.resume() + stream_worker_sync.resume() # Wait 5 minutes future = dt_util.utcnow() + timedelta(minutes=5) @@ -157,11 +104,11 @@ async def test_stream_timeout(hass, hass_client, worker_sync): assert fail_response.status == HTTP_NOT_FOUND -async def test_stream_ended(hass, worker_sync): +async def test_stream_ended(hass, stream_worker_sync): """Test hls stream packets ended.""" await async_setup_component(hass, "stream", {"stream": {}}) - worker_sync.pause() + stream_worker_sync.pause() # Setup demo HLS track source = generate_h264_video() @@ -179,7 +126,7 @@ async def test_stream_ended(hass, worker_sync): segments = segment.sequence # Allow worker to finalize once enough of the stream is been consumed if segments > 1: - worker_sync.resume() + stream_worker_sync.resume() assert segments > 1 assert not track.get_segment() diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 220d0a062e3..1b46738c8f2 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -1,6 +1,8 @@ """The tests for hls streams.""" from datetime import timedelta -from io import BytesIO +import logging +import os +import threading from unittest.mock import patch import av @@ -14,40 +16,96 @@ import homeassistant.util.dt as dt_util from tests.common import async_fire_time_changed from tests.components.stream.common import generate_h264_video, preload_stream +TEST_TIMEOUT = 10 -@pytest.mark.skip("Flaky in CI") -async def test_record_stream(hass, hass_client): + +class SaveRecordWorkerSync: + """ + Test fixture to manage RecordOutput thread for recorder_save_worker. + + This is used to assert that the worker is started and stopped cleanly + to avoid thread leaks in tests. + """ + + def __init__(self): + """Initialize SaveRecordWorkerSync.""" + self.reset() + + def recorder_save_worker(self, *args, **kwargs): + """Mock method for patch.""" + logging.debug("recorder_save_worker thread started") + assert self._save_thread is None + self._save_thread = threading.current_thread() + self._save_event.set() + + def join(self): + """Verify save worker was invoked and block on shutdown.""" + assert self._save_event.wait(timeout=TEST_TIMEOUT) + self._save_thread.join() + + def reset(self): + """Reset callback state for reuse in tests.""" + self._save_thread = None + self._save_event = threading.Event() + + +@pytest.fixture() +def record_worker_sync(hass): + """Patch recorder_save_worker for clean thread shutdown for test.""" + sync = SaveRecordWorkerSync() + with patch( + "homeassistant.components.stream.recorder.recorder_save_worker", + side_effect=sync.recorder_save_worker, + autospec=True, + ): + yield sync + + +async def test_record_stream(hass, hass_client, stream_worker_sync, record_worker_sync): """ Test record stream. - Purposefully not mocking anything here to test full - integration with the stream component. + Tests full integration with the stream component, and captures the + stream worker and save worker to allow for clean shutdown of background + threads. The actual save logic is tested in test_recorder_save below. """ await async_setup_component(hass, "stream", {"stream": {}}) - with patch("homeassistant.components.stream.recorder.recorder_save_worker"): - # Setup demo track - source = generate_h264_video() - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder") - stream.start() + stream_worker_sync.pause() - while True: - segment = await recorder.recv() - if not segment: - break - segments = segment.sequence + # Setup demo track + source = generate_h264_video() + stream = preload_stream(hass, source) + recorder = stream.add_provider("recorder") + stream.start() - stream.stop() + while True: + segment = await recorder.recv() + if not segment: + break + segments = segment.sequence + if segments > 1: + stream_worker_sync.resume() - assert segments > 1 + stream.stop() + assert segments > 1 + + # Verify that the save worker was invoked, then block until its + # thread completes and is shutdown completely to avoid thread leaks. + record_worker_sync.join() -@pytest.mark.skip("Flaky in CI") -async def test_recorder_timeout(hass, hass_client): - """Test recorder timeout.""" +async def test_recorder_timeout(hass, hass_client, stream_worker_sync): + """ + Test recorder timeout. + + Mocks out the cleanup to assert that it is invoked after a timeout. + This test does not start the recorder save thread. + """ await async_setup_component(hass, "stream", {"stream": {}}) + stream_worker_sync.pause() + with patch( "homeassistant.components.stream.recorder.RecorderOutput.cleanup" ) as mock_cleanup: @@ -66,24 +124,28 @@ async def test_recorder_timeout(hass, hass_client): assert mock_cleanup.called + stream_worker_sync.resume() + stream.stop() + await hass.async_block_till_done() + await hass.async_block_till_done() -@pytest.mark.skip("Flaky in CI") -async def test_recorder_save(): + +async def test_recorder_save(tmpdir): """Test recorder save.""" # Setup source = generate_h264_video() - output = BytesIO() - output.name = "test.mp4" + filename = f"{tmpdir}/test.mp4" # Run - recorder_save_worker(output, [Segment(1, source, 4)], "mp4") + recorder_save_worker(filename, [Segment(1, source, 4)], "mp4") # Assert - assert output.getvalue() + assert os.path.exists(filename) -@pytest.mark.skip("Flaky in CI") -async def test_record_stream_audio(hass, hass_client): +async def test_record_stream_audio( + hass, hass_client, stream_worker_sync, record_worker_sync +): """ Test treatment of different audio inputs. @@ -98,23 +160,31 @@ async def test_record_stream_audio(hass, hass_client): ("empty", 0), # audio stream with no packets (None, 0), # no audio stream ): - with patch("homeassistant.components.stream.recorder.recorder_save_worker"): - # Setup demo track - source = generate_h264_video( - container_format="mov", audio_codec=a_codec - ) # mov can store PCM - stream = preload_stream(hass, source) - recorder = stream.add_provider("recorder") - stream.start() + record_worker_sync.reset() + stream_worker_sync.pause() - while True: - segment = await recorder.recv() - if not segment: - break - last_segment = segment + # Setup demo track + source = generate_h264_video( + container_format="mov", audio_codec=a_codec + ) # mov can store PCM + stream = preload_stream(hass, source) + recorder = stream.add_provider("recorder") + stream.start() - result = av.open(last_segment.segment, "r", format="mp4") + while True: + segment = await recorder.recv() + if not segment: + break + last_segment = segment + stream_worker_sync.resume() - assert len(result.streams.audio) == expected_audio_streams - result.close() - stream.stop() + result = av.open(last_segment.segment, "r", format="mp4") + + assert len(result.streams.audio) == expected_audio_streams + result.close() + stream.stop() + await hass.async_block_till_done() + + # Verify that the save worker was invoked, then block until its + # thread completes and is shutdown completely to avoid thread leaks. + record_worker_sync.join()