diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 476b64b9650..3ce3d0af6fd 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -171,6 +171,10 @@ class Stream: from .worker import stream_worker if self._thread is None or not self._thread.isAlive(): + if self._thread is not None: + # The thread must have crashed/exited. Join to clean up the + # previous thread. + self._thread.join(timeout=0) self._thread_quit = threading.Event() self._thread = threading.Thread( name="stream_worker", diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index ea4359e2a0c..b461d17cf1c 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -2,6 +2,7 @@ from collections import deque import io import logging +import time import av @@ -35,6 +36,25 @@ def create_stream_buffer(stream_output, video_stream, audio_stream, sequence): def stream_worker(hass, stream, quit_event): + """Handle consuming streams and restart keepalive streams.""" + + wait_timeout = 0 + while not quit_event.wait(timeout=wait_timeout): + start_time = time.time() + try: + _stream_worker_internal(hass, stream, quit_event) + except av.error.FFmpegError: # pylint: disable=c-extension-no-member + _LOGGER.exception("Stream connection failed: %s", stream.source) + if not stream.keepalive or quit_event.is_set(): + break + # To avoid excessive restarts, don't restart faster than once every 40 seconds. + wait_timeout = max(40 - (time.time() - start_time), 0) + _LOGGER.debug( + "Restarting stream worker in %d seconds: %s", wait_timeout, stream.source, + ) + + +def _stream_worker_internal(hass, stream, quit_event): """Handle consuming streams.""" container = av.open(stream.source, options=stream.options) @@ -112,13 +132,15 @@ def stream_worker(hass, stream, quit_event): audio_stream = None except (av.AVError, StopIteration) as ex: - # End of stream, clear listeners and stop thread - for fmt, _ in outputs.items(): - hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) + if not stream.keepalive: + # End of stream, clear listeners and stop thread + for fmt, _ in outputs.items(): + hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) _LOGGER.error( "Error demuxing stream while finding first packet: %s", str(ex) ) - quit_event.set() + return False + return True def initialize_segment(video_pts): """Reset some variables and initialize outputs for each segment.""" @@ -159,7 +181,9 @@ def stream_worker(hass, stream, quit_event): packet.stream = output_streams[audio_stream] buffer.output.mux(packet) - peek_first_pts() + if not peek_first_pts(): + container.close() + return last_dts = {k: v - 1 for k, v in first_pts.items()} initialize_segment(first_pts[video_stream]) @@ -179,9 +203,10 @@ def stream_worker(hass, stream, quit_event): continue last_packet_was_without_dts = False except (av.AVError, StopIteration) as ex: - # End of stream, clear listeners and stop thread - for fmt, _ in outputs.items(): - hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) + if not stream.keepalive: + # End of stream, clear listeners and stop thread + for fmt, _ in outputs.items(): + hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) _LOGGER.error("Error demuxing stream: %s", str(ex)) break diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index b4c0b0e536f..863513c8157 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -2,6 +2,7 @@ from datetime import timedelta from urllib.parse import urlparse +import av import pytest from homeassistant.components.stream import request_stream @@ -9,6 +10,7 @@ from homeassistant.const import HTTP_NOT_FOUND from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util +from tests.async_mock import patch from tests.common import async_fire_time_changed from tests.components.stream.common import generate_h264_video, preload_stream @@ -122,3 +124,37 @@ async def test_stream_ended(hass): # Stop stream, if it hasn't quit already stream.stop() + + +async def test_stream_keepalive(hass): + """Test hls stream retries the stream when keepalive=True.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + # Setup demo HLS track + source = "test_stream_keepalive_source" + stream = preload_stream(hass, source) + track = stream.add_provider("hls") + track.num_segments = 2 + + cur_time = 0 + + def time_side_effect(): + nonlocal cur_time + if cur_time >= 80: + stream.keepalive = False # Thread should exit and be joinable. + cur_time += 40 + return cur_time + + with patch("av.open") as av_open, patch( + "homeassistant.components.stream.worker.time" + ) as mock_time: + av_open.side_effect = av.error.InvalidDataError(-2, "error") + mock_time.time.side_effect = time_side_effect + # Request stream + request_stream(hass, source, keepalive=True) + stream._thread.join() + stream._thread = None + assert av_open.call_count == 2 + + # Stop stream, if it hasn't quit already + stream.stop()