diff --git a/homeassistant/components/nest/camera_sdm.py b/homeassistant/components/nest/camera_sdm.py index 8f5ba88fdd4..cc2730fad8a 100644 --- a/homeassistant/components/nest/camera_sdm.py +++ b/homeassistant/components/nest/camera_sdm.py @@ -146,13 +146,9 @@ class NestCamera(Camera): # Next attempt to catch a url will get a new one self._stream = None return - # Stop any existing stream worker since the url is invalid. The next - # request for this stream will restart it with the right url. - # Issue #42793 tracks improvements (e.g. preserve keepalive, smoother - # transitions across streams) + # Update the stream worker with the latest valid url if self.stream: - self.stream.stop() - self.stream = None + self.stream.update_source(self._stream.rtsp_stream_url) self._schedule_stream_refresh() async def async_will_remove_from_hass(self): diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index a8b344a98e9..e871963d2ba 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -109,8 +109,9 @@ class Stream: self.keepalive = False self.access_token = None self._thread = None - self._thread_quit = None + self._thread_quit = threading.Event() self._outputs = {} + self._fast_restart_once = False if self.options is None: self.options = {} @@ -167,7 +168,7 @@ class Stream: # The thread must have crashed/exited. Join to clean up the # previous thread. self._thread.join(timeout=0) - self._thread_quit = threading.Event() + self._thread_quit.clear() self._thread = threading.Thread( name="stream_worker", target=self._run_worker, @@ -175,6 +176,13 @@ class Stream: self._thread.start() _LOGGER.info("Started stream: %s", self.source) + def update_source(self, new_source): + """Restart the stream with a new stream source.""" + _LOGGER.debug("Updating stream source %s", self.source) + self.source = new_source + self._fast_restart_once = True + self._thread_quit.set() + def _run_worker(self): """Handle consuming streams and restart keepalive streams.""" # Keep import here so that we can import stream integration without installing reqs @@ -186,8 +194,12 @@ class Stream: start_time = time.time() stream_worker(self.hass, self, self._thread_quit) if not self.keepalive or self._thread_quit.is_set(): + if self._fast_restart_once: + # The stream source is updated, restart without any delay. + self._fast_restart_once = False + self._thread_quit.clear() + continue break - # To avoid excessive restarts, wait before restarting # As the required recovery time may be different for different setups, start # with trying a short wait_timeout and increase it on each reconnection attempt. diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 91d02664d74..0d5be68d93c 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -14,6 +14,7 @@ failure modes or corner cases like how out of order packets are handled. """ import fractions +import io import math import threading from unittest.mock import patch @@ -44,6 +45,7 @@ LONGER_TEST_SEQUENCE_LENGTH = 20 * VIDEO_FRAME_RATE OUT_OF_ORDER_PACKET_INDEX = 3 * VIDEO_FRAME_RATE PACKETS_PER_SEGMENT = SEGMENT_DURATION / PACKET_DURATION SEGMENTS_PER_PACKET = PACKET_DURATION / SEGMENT_DURATION +TIMEOUT = 15 class FakePyAvStream: @@ -178,9 +180,9 @@ class MockPyAv: def open(self, stream_source, *args, **kwargs): """Return a stream or buffer depending on args.""" - if stream_source == STREAM_SOURCE: - return self.container - return self.capture_buffer + if isinstance(stream_source, io.BytesIO): + return self.capture_buffer + return self.container async def async_decode_stream(hass, packets, py_av=None): @@ -469,3 +471,77 @@ async def test_pts_out_of_order(hass): assert all([s.duration == SEGMENT_DURATION for s in segments]) assert len(decoded_stream.video_packets) == len(packets) assert len(decoded_stream.audio_packets) == 0 + + +async def test_stream_stopped_while_decoding(hass): + """Tests that worker quits when stop() is called while decodign.""" + # Add some synchronization so that the test can pause the background + # worker. When the worker is stopped, the test invokes stop() which + # will cause the worker thread to exit once it enters the decode + # loop + worker_open = threading.Event() + worker_wake = threading.Event() + + stream = Stream(hass, STREAM_SOURCE) + stream.add_provider(STREAM_OUTPUT_FORMAT) + + py_av = MockPyAv() + py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH) + + def blocking_open(stream_source, *args, **kwargs): + # Let test know the thread is running + worker_open.set() + # Block worker thread until test wakes up + worker_wake.wait() + return py_av.open(stream_source, args, kwargs) + + with patch("av.open", new=blocking_open): + stream.start() + assert worker_open.wait(TIMEOUT) + # Note: There is a race here where the worker could start as soon + # as the wake event is sent, completing all decode work. + worker_wake.set() + stream.stop() + + +async def test_update_stream_source(hass): + """Tests that the worker is re-invoked when the stream source is updated.""" + worker_open = threading.Event() + worker_wake = threading.Event() + + stream = Stream(hass, STREAM_SOURCE) + stream.add_provider(STREAM_OUTPUT_FORMAT) + # Note that keepalive is not set here. The stream is "restarted" even though + # it is not stopping due to failure. + + py_av = MockPyAv() + py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH) + + last_stream_source = None + + def blocking_open(stream_source, *args, **kwargs): + nonlocal last_stream_source + if not isinstance(stream_source, io.BytesIO): + last_stream_source = stream_source + # Let test know the thread is running + worker_open.set() + # Block worker thread until test wakes up + worker_wake.wait() + return py_av.open(stream_source, args, kwargs) + + with patch("av.open", new=blocking_open): + stream.start() + assert worker_open.wait(TIMEOUT) + assert last_stream_source == STREAM_SOURCE + + # Update the stream source, then the test wakes up the worker and assert + # that it re-opens the new stream (the test again waits on thread_started) + worker_open.clear() + stream.update_source(STREAM_SOURCE + "-updated-source") + worker_wake.set() + assert worker_open.wait(TIMEOUT) + assert last_stream_source == STREAM_SOURCE + "-updated-source" + worker_wake.set() + + # Ccleanup + stream.stop()