From 0a128d006f0c0de4bdfe2acda66beb8fa5731463 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Sat, 12 Feb 2022 20:59:11 -0800 Subject: [PATCH] Improve stream robustness by always retrying worker (#66417) Improve stream robustness by always retrying in the worker on failure, rather than only when keepalive is enabled. This will make cloud cameras like nest more robust, since they have a tendency to be flaky. This is also needed to improve client side retry behavior because when the client attempts to retry, the stream token is already revoked because the worker stopped. The worker will still idle timeout if no streams are present, so it won't go on forever if no frontend is viewing the stream. --- homeassistant/components/stream/__init__.py | 7 ++++++- tests/components/stream/conftest.py | 12 +++++++++++- tests/components/stream/test_hls.py | 13 ++++++++----- tests/components/stream/test_worker.py | 4 ++-- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index be166d13455..365ea946f51 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -342,7 +342,7 @@ class Stream: self._logger.error("Error from stream worker: %s", str(err)) stream_state.discontinuity() - if not self.keepalive or self._thread_quit.is_set(): + if not _should_retry() or self._thread_quit.is_set(): if self._fast_restart_once: # The stream source is updated, restart without any delay. self._fast_restart_once = False @@ -446,3 +446,8 @@ class Stream: return await self._keyframe_converter.async_get_image( width=width, height=height ) + + +def _should_retry() -> bool: + """Return true if worker failures should be retried, for disabling during tests.""" + return True diff --git a/tests/components/stream/conftest.py b/tests/components/stream/conftest.py index c754903965a..407b144267b 100644 --- a/tests/components/stream/conftest.py +++ b/tests/components/stream/conftest.py @@ -16,7 +16,8 @@ from collections import deque from http import HTTPStatus import logging import threading -from unittest.mock import patch +from typing import Generator +from unittest.mock import Mock, patch from aiohttp import web import async_timeout @@ -219,6 +220,15 @@ def hls_sync(): yield sync +@pytest.fixture(autouse=True) +def should_retry() -> Generator[Mock, None, None]: + """Fixture to disable stream worker retries in tests by default.""" + with patch( + "homeassistant.components.stream._should_retry", return_value=False + ) as mock_should_retry: + yield mock_should_retry + + @pytest.fixture(scope="package") def h264_video(): """Generate a video, shared across tests.""" diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 0492fec14f0..a2bb328826d 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -1,6 +1,7 @@ """The tests for hls streams.""" from datetime import timedelta from http import HTTPStatus +import logging from unittest.mock import patch from urllib.parse import urlparse @@ -252,8 +253,8 @@ async def test_stream_timeout_after_stop( await hass.async_block_till_done() -async def test_stream_keepalive(hass, setup_component): - """Test hls stream retries the stream when keepalive=True.""" +async def test_stream_retries(hass, setup_component, should_retry): + """Test hls stream is retried on failure.""" # Setup demo HLS track source = "test_stream_keepalive_source" stream = create_stream(hass, source, {}) @@ -271,9 +272,11 @@ async def test_stream_keepalive(hass, setup_component): cur_time = 0 def time_side_effect(): + logging.info("time side effect") nonlocal cur_time if cur_time >= 80: - stream.keepalive = False # Thread should exit and be joinable. + logging.info("changing return value") + should_retry.return_value = False # Thread should exit and be joinable. cur_time += 40 return cur_time @@ -284,8 +287,8 @@ async def test_stream_keepalive(hass, setup_component): ): av_open.side_effect = av.error.InvalidDataError(-2, "error") mock_time.time.side_effect = time_side_effect - # Request stream - stream.keepalive = True + # Request stream. Enable retries which are disabled by default in tests. + should_retry.return_value = True stream.start() stream._thread.join() stream._thread = None diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index b54c8dc3472..e5477c66f52 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -669,8 +669,8 @@ async def test_update_stream_source(hass): stream = Stream(hass, STREAM_SOURCE, {}) stream.add_provider(HLS_PROVIDER) - # Note that keepalive is not set here. The stream is "restarted" even though - # it is not stopping due to failure. + # Note that retries are disabled by default in tests, however the stream is "restarted" when + # the stream source is updated. py_av = MockPyAv() py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH)