Add timeouts in stream tests to prevent possible hangs (#47545)

* Add timeouts on recving packets

Add a timeout when recving packets from the worker thread in case it hangs.
Add an exit condition just in case the while loop goes on forever.

* Add a timeout to recorder thread join.

* Wait for recorder thread to be invoked in tests

Remove the while loop and instead wait for segments to be produced by the background worker thread.

* Allow worker to resume before stopping to fix timeouts

* Lower test timeout further

* Remove test_stream_ended since it is flaky

This test doesn't really add additional value on top of other tests.
This commit is contained in:
Allen Porter 2021-03-13 01:53:26 -08:00 committed by GitHub
parent 30f99177c7
commit 02a82d3f00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 55 deletions

View File

@ -32,6 +32,7 @@ class WorkerSync:
def resume(self): def resume(self):
"""Allow the worker thread to finalize the stream.""" """Allow the worker thread to finalize the stream."""
logging.debug("waking blocked worker")
self._event.set() self._event.set()
def blocking_finish(self, stream: Stream): def blocking_finish(self, stream: Stream):

View File

@ -20,6 +20,8 @@ from tests.components.stream.common import generate_h264_video
STREAM_SOURCE = "some-stream-source" STREAM_SOURCE = "some-stream-source"
SEQUENCE_BYTES = io.BytesIO(b"some-bytes") SEQUENCE_BYTES = io.BytesIO(b"some-bytes")
DURATION = 10 DURATION = 10
TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout
MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever
class HlsClient: class HlsClient:
@ -187,39 +189,6 @@ async def test_stream_timeout_after_stop(hass, hass_client, stream_worker_sync):
await hass.async_block_till_done() await hass.async_block_till_done()
async def test_stream_ended(hass, stream_worker_sync):
"""Test hls stream packets ended."""
await async_setup_component(hass, "stream", {"stream": {}})
stream_worker_sync.pause()
# Setup demo HLS track
source = generate_h264_video()
stream = create_stream(hass, source)
track = stream.add_provider("hls")
# Request stream
stream.add_provider("hls")
stream.start()
stream.endpoint_url("hls")
# Run it dead
while True:
segment = await track.recv()
if segment is None:
break
segments = segment.sequence
# Allow worker to finalize once enough of the stream is been consumed
if segments > 1:
stream_worker_sync.resume()
assert segments > 1
assert not track.get_segment()
# Stop stream, if it hasn't quit already
stream.stop()
async def test_stream_keepalive(hass): async def test_stream_keepalive(hass):
"""Test hls stream retries the stream when keepalive=True.""" """Test hls stream retries the stream when keepalive=True."""
await async_setup_component(hass, "stream", {"stream": {}}) await async_setup_component(hass, "stream", {"stream": {}})

View File

@ -1,10 +1,13 @@
"""The tests for hls streams.""" """The tests for hls streams."""
import asyncio
from datetime import timedelta from datetime import timedelta
import logging import logging
import os import os
import threading import threading
from typing import Deque
from unittest.mock import patch from unittest.mock import patch
import async_timeout
import av import av
import pytest import pytest
@ -18,7 +21,8 @@ import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed from tests.common import async_fire_time_changed
from tests.components.stream.common import generate_h264_video from tests.components.stream.common import generate_h264_video
TEST_TIMEOUT = 10 TEST_TIMEOUT = 7.0 # Lower than 9s home assistant timeout
MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever
class SaveRecordWorkerSync: class SaveRecordWorkerSync:
@ -32,23 +36,33 @@ class SaveRecordWorkerSync:
def __init__(self): def __init__(self):
"""Initialize SaveRecordWorkerSync.""" """Initialize SaveRecordWorkerSync."""
self.reset() self.reset()
self._segments = None
def recorder_save_worker(self, *args, **kwargs): def recorder_save_worker(self, file_out: str, segments: Deque[Segment]):
"""Mock method for patch.""" """Mock method for patch."""
logging.debug("recorder_save_worker thread started") logging.debug("recorder_save_worker thread started")
assert self._save_thread is None assert self._save_thread is None
self._segments = segments
self._save_thread = threading.current_thread() self._save_thread = threading.current_thread()
self._save_event.set() self._save_event.set()
def join(self): async def get_segments(self):
"""Return the recorded video segments."""
with async_timeout.timeout(TEST_TIMEOUT):
await self._save_event.wait()
return self._segments
async def join(self):
"""Verify save worker was invoked and block on shutdown.""" """Verify save worker was invoked and block on shutdown."""
assert self._save_event.wait(timeout=TEST_TIMEOUT) with async_timeout.timeout(TEST_TIMEOUT):
self._save_thread.join() await self._save_event.wait()
self._save_thread.join(timeout=TEST_TIMEOUT)
assert not self._save_thread.is_alive()
def reset(self): def reset(self):
"""Reset callback state for reuse in tests.""" """Reset callback state for reuse in tests."""
self._save_thread = None self._save_thread = None
self._save_event = threading.Event() self._save_event = asyncio.Event()
@pytest.fixture() @pytest.fixture()
@ -63,7 +77,7 @@ def record_worker_sync(hass):
yield sync yield sync
async def test_record_stream(hass, hass_client, stream_worker_sync, record_worker_sync): async def test_record_stream(hass, hass_client, record_worker_sync):
""" """
Test record stream. Test record stream.
@ -73,29 +87,21 @@ async def test_record_stream(hass, hass_client, stream_worker_sync, record_worke
""" """
await async_setup_component(hass, "stream", {"stream": {}}) await async_setup_component(hass, "stream", {"stream": {}})
stream_worker_sync.pause()
# Setup demo track # Setup demo track
source = generate_h264_video() source = generate_h264_video()
stream = create_stream(hass, source) stream = create_stream(hass, source)
with patch.object(hass.config, "is_allowed_path", return_value=True): with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path") await stream.async_record("/example/path")
recorder = stream.add_provider("recorder") # After stream decoding finishes, the record worker thread starts
while True: segments = await record_worker_sync.get_segments()
segment = await recorder.recv() assert len(segments) >= 1
if not segment:
break
segments = segment.sequence
if segments > 1:
stream_worker_sync.resume()
stream.stop()
assert segments > 1
# Verify that the save worker was invoked, then block until its # Verify that the save worker was invoked, then block until its
# thread completes and is shutdown completely to avoid thread leaks. # thread completes and is shutdown completely to avoid thread leaks.
record_worker_sync.join() await record_worker_sync.join()
stream.stop()
async def test_record_lookback( async def test_record_lookback(
@ -250,4 +256,4 @@ async def test_record_stream_audio(
# Verify that the save worker was invoked, then block until its # Verify that the save worker was invoked, then block until its
# thread completes and is shutdown completely to avoid thread leaks. # thread completes and is shutdown completely to avoid thread leaks.
record_worker_sync.join() await record_worker_sync.join()