From 01a4a83babd19e1316c1770b65a2b12803ab1837 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Fri, 17 Jun 2022 01:48:52 +1000 Subject: [PATCH] Improve stream playback on high latency cameras (#72547) * Disable LL-HLS for HLS sources * Add extra wait for Nest cameras --- homeassistant/components/camera/__init__.py | 2 +- .../components/generic/config_flow.py | 2 +- homeassistant/components/nest/camera_sdm.py | 2 + homeassistant/components/stream/__init__.py | 72 ++++++++++++------- homeassistant/components/stream/const.py | 1 + homeassistant/components/stream/core.py | 19 ++--- homeassistant/components/stream/hls.py | 28 +++++--- homeassistant/components/stream/recorder.py | 11 ++- homeassistant/components/stream/worker.py | 25 ++++--- tests/components/stream/test_ll_hls.py | 4 +- tests/components/stream/test_worker.py | 48 ++++++++++--- 11 files changed, 145 insertions(+), 69 deletions(-) diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 2ed8b58232d..45b77ec1bd6 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -454,7 +454,7 @@ class Camera(Entity): def __init__(self) -> None: """Initialize a camera.""" self.stream: Stream | None = None - self.stream_options: dict[str, str | bool] = {} + self.stream_options: dict[str, str | bool | float] = {} self.content_type: str = DEFAULT_CONTENT_TYPE self.access_tokens: collections.deque = collections.deque([], 2) self._warned_old_signature = False diff --git a/homeassistant/components/generic/config_flow.py b/homeassistant/components/generic/config_flow.py index 93b34133c63..3cc78fca406 100644 --- a/homeassistant/components/generic/config_flow.py +++ b/homeassistant/components/generic/config_flow.py @@ -209,7 +209,7 @@ async def async_test_stream(hass, info) -> dict[str, str]: except TemplateError as err: _LOGGER.warning("Problem rendering template %s: %s", stream_source, err) return {CONF_STREAM_SOURCE: "template_error"} - stream_options: dict[str, bool | str] = {} + stream_options: dict[str, str | bool | float] = {} if rtsp_transport := info.get(CONF_RTSP_TRANSPORT): stream_options[CONF_RTSP_TRANSPORT] = rtsp_transport if info.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): diff --git a/homeassistant/components/nest/camera_sdm.py b/homeassistant/components/nest/camera_sdm.py index 61f8ead4ea3..a089163a826 100644 --- a/homeassistant/components/nest/camera_sdm.py +++ b/homeassistant/components/nest/camera_sdm.py @@ -20,6 +20,7 @@ from google_nest_sdm.exceptions import ApiException from homeassistant.components.camera import Camera, CameraEntityFeature from homeassistant.components.camera.const import StreamType +from homeassistant.components.stream import CONF_EXTRA_PART_WAIT_TIME from homeassistant.config_entries import ConfigEntry from homeassistant.core import HomeAssistant from homeassistant.exceptions import HomeAssistantError @@ -67,6 +68,7 @@ class NestCamera(Camera): self._create_stream_url_lock = asyncio.Lock() self._stream_refresh_unsub: Callable[[], None] | None = None self._attr_is_streaming = CameraLiveStreamTrait.NAME in self._device.traits + self.stream_options[CONF_EXTRA_PART_WAIT_TIME] = 3 @property def should_poll(self) -> bool: diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index c33188fd71c..19ef009a845 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -18,6 +18,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Mapping +import copy import logging import re import secrets @@ -38,6 +39,7 @@ from .const import ( ATTR_ENDPOINTS, ATTR_SETTINGS, ATTR_STREAMS, + CONF_EXTRA_PART_WAIT_TIME, CONF_LL_HLS, CONF_PART_DURATION, CONF_RTSP_TRANSPORT, @@ -62,8 +64,11 @@ from .diagnostics import Diagnostics from .hls import HlsStreamOutput, async_setup_hls __all__ = [ + "ATTR_SETTINGS", + "CONF_EXTRA_PART_WAIT_TIME", "CONF_RTSP_TRANSPORT", "CONF_USE_WALLCLOCK_AS_TIMESTAMPS", + "DOMAIN", "FORMAT_CONTENT_TYPE", "HLS_PROVIDER", "OUTPUT_FORMATS", @@ -91,7 +96,7 @@ def redact_credentials(data: str) -> str: def create_stream( hass: HomeAssistant, stream_source: str, - options: dict[str, str | bool], + options: Mapping[str, str | bool | float], stream_label: str | None = None, ) -> Stream: """Create a stream with the specified identfier based on the source url. @@ -101,11 +106,35 @@ def create_stream( The stream_label is a string used as an additional message in logging. """ + + def convert_stream_options( + hass: HomeAssistant, stream_options: Mapping[str, str | bool | float] + ) -> tuple[dict[str, str], StreamSettings]: + """Convert options from stream options into PyAV options and stream settings.""" + stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS]) + pyav_options: dict[str, str] = {} + try: + STREAM_OPTIONS_SCHEMA(stream_options) + except vol.Invalid as exc: + raise HomeAssistantError("Invalid stream options") from exc + + if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME): + stream_settings.hls_part_timeout += extra_wait_time + if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): + assert isinstance(rtsp_transport, str) + # The PyAV options currently match the stream CONF constants, but this + # will not necessarily always be the case, so they are hard coded here + pyav_options["rtsp_transport"] = rtsp_transport + if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): + pyav_options["use_wallclock_as_timestamps"] = "1" + + return pyav_options, stream_settings + if DOMAIN not in hass.config.components: raise HomeAssistantError("Stream integration is not set up.") - # Convert extra stream options into PyAV options - pyav_options = convert_stream_options(options) + # Convert extra stream options into PyAV options and stream settings + pyav_options, stream_settings = convert_stream_options(hass, options) # For RTSP streams, prefer TCP if isinstance(stream_source, str) and stream_source[:7] == "rtsp://": pyav_options = { @@ -115,7 +144,11 @@ def create_stream( } stream = Stream( - hass, stream_source, options=pyav_options, stream_label=stream_label + hass, + stream_source, + pyav_options=pyav_options, + stream_settings=stream_settings, + stream_label=stream_label, ) hass.data[DOMAIN][ATTR_STREAMS].append(stream) return stream @@ -230,13 +263,15 @@ class Stream: self, hass: HomeAssistant, source: str, - options: dict[str, str], + pyav_options: dict[str, str], + stream_settings: StreamSettings, stream_label: str | None = None, ) -> None: """Initialize a stream.""" self.hass = hass self.source = source - self.options = options + self.pyav_options = pyav_options + self._stream_settings = stream_settings self._stream_label = stream_label self.keepalive = False self.access_token: str | None = None @@ -284,7 +319,9 @@ class Stream: self.check_idle() provider = PROVIDERS[fmt]( - self.hass, IdleTimer(self.hass, timeout, idle_callback) + self.hass, + IdleTimer(self.hass, timeout, idle_callback), + self._stream_settings, ) self._outputs[fmt] = provider @@ -368,7 +405,8 @@ class Stream: try: stream_worker( self.source, - self.options, + self.pyav_options, + self._stream_settings, stream_state, self._keyframe_converter, self._thread_quit, @@ -507,22 +545,6 @@ STREAM_OPTIONS_SCHEMA: Final = vol.Schema( { vol.Optional(CONF_RTSP_TRANSPORT): vol.In(RTSP_TRANSPORTS), vol.Optional(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): bool, + vol.Optional(CONF_EXTRA_PART_WAIT_TIME): cv.positive_float, } ) - - -def convert_stream_options(stream_options: dict[str, str | bool]) -> dict[str, str]: - """Convert options from stream options into PyAV options.""" - pyav_options: dict[str, str] = {} - try: - STREAM_OPTIONS_SCHEMA(stream_options) - except vol.Invalid as exc: - raise HomeAssistantError("Invalid stream options") from exc - - if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): - assert isinstance(rtsp_transport, str) - pyav_options["rtsp_transport"] = rtsp_transport - if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): - pyav_options["use_wallclock_as_timestamps"] = "1" - - return pyav_options diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index f8c9ba85d59..35af633435e 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -53,3 +53,4 @@ RTSP_TRANSPORTS = { "http": "HTTP", } CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps" +CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time" diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index da18a5a6a08..c8d831157a8 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -118,6 +118,10 @@ class Segment: if self.hls_playlist_complete: return self.hls_playlist_template[0] if not self.hls_playlist_template: + # Logically EXT-X-DISCONTINUITY makes sense above the parts, but Apple's + # media stream validator seems to only want it before the segment + if last_stream_id != self.stream_id: + self.hls_playlist_template.append("#EXT-X-DISCONTINUITY") # This is a placeholder where the rendered parts will be inserted self.hls_playlist_template.append("{}") if render_parts: @@ -133,22 +137,19 @@ class Segment: # the first element to avoid an extra newline when we don't render any parts. # Append an empty string to create a trailing newline when we do render parts self.hls_playlist_parts.append("") - self.hls_playlist_template = [] - # Logically EXT-X-DISCONTINUITY would make sense above the parts, but Apple's - # media stream validator seems to only want it before the segment - if last_stream_id != self.stream_id: - self.hls_playlist_template.append("#EXT-X-DISCONTINUITY") + self.hls_playlist_template = ( + [] if last_stream_id == self.stream_id else ["#EXT-X-DISCONTINUITY"] + ) # Add the remaining segment metadata + # The placeholder goes on the same line as the next element self.hls_playlist_template.extend( [ - "#EXT-X-PROGRAM-DATE-TIME:" + "{}#EXT-X-PROGRAM-DATE-TIME:" + self.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", f"#EXTINF:{self.duration:.3f},\n./segment/{self.sequence}.m4s", ] ) - # The placeholder now goes on the same line as the first element - self.hls_playlist_template[0] = "{}" + self.hls_playlist_template[0] # Store intermediate playlist data in member variables for reuse self.hls_playlist_template = ["\n".join(self.hls_playlist_template)] @@ -237,11 +238,13 @@ class StreamOutput: self, hass: HomeAssistant, idle_timer: IdleTimer, + stream_settings: StreamSettings, deque_maxlen: int | None = None, ) -> None: """Initialize a stream output.""" self._hass = hass self.idle_timer = idle_timer + self.stream_settings = stream_settings self._event = asyncio.Event() self._part_event = asyncio.Event() self._segments: deque[Segment] = deque(maxlen=deque_maxlen) diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 8e78093d07a..efecdcbe9dc 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -9,8 +9,6 @@ from aiohttp import web from homeassistant.core import HomeAssistant, callback from .const import ( - ATTR_SETTINGS, - DOMAIN, EXT_X_START_LL_HLS, EXT_X_START_NON_LL_HLS, FORMAT_CONTENT_TYPE, @@ -47,11 +45,15 @@ def async_setup_hls(hass: HomeAssistant) -> str: class HlsStreamOutput(StreamOutput): """Represents HLS Output formats.""" - def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None: + def __init__( + self, + hass: HomeAssistant, + idle_timer: IdleTimer, + stream_settings: StreamSettings, + ) -> None: """Initialize HLS output.""" - super().__init__(hass, idle_timer, deque_maxlen=MAX_SEGMENTS) - self.stream_settings: StreamSettings = hass.data[DOMAIN][ATTR_SETTINGS] - self._target_duration = self.stream_settings.min_segment_duration + super().__init__(hass, idle_timer, stream_settings, deque_maxlen=MAX_SEGMENTS) + self._target_duration = stream_settings.min_segment_duration @property def name(self) -> str: @@ -78,14 +80,20 @@ class HlsStreamOutput(StreamOutput): ) def discontinuity(self) -> None: - """Remove incomplete segment from deque.""" + """Fix incomplete segment at end of deque.""" self._hass.loop.call_soon_threadsafe(self._async_discontinuity) @callback def _async_discontinuity(self) -> None: - """Remove incomplete segment from deque in event loop.""" - if self._segments and not self._segments[-1].complete: - self._segments.pop() + """Fix incomplete segment at end of deque in event loop.""" + # Fill in the segment duration or delete the segment if empty + if self._segments: + if (last_segment := self._segments[-1]).parts: + last_segment.duration = sum( + part.duration for part in last_segment.parts + ) + else: + self._segments.pop() class HlsMasterPlaylistView(StreamView): diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index ae1c64396c8..4d97c0d683d 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -17,7 +17,7 @@ from .const import ( RECORDER_PROVIDER, SEGMENT_CONTAINER_FORMAT, ) -from .core import PROVIDERS, IdleTimer, Segment, StreamOutput +from .core import PROVIDERS, IdleTimer, Segment, StreamOutput, StreamSettings _LOGGER = logging.getLogger(__name__) @@ -121,9 +121,14 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None: class RecorderOutput(StreamOutput): """Represents HLS Output formats.""" - def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None: + def __init__( + self, + hass: HomeAssistant, + idle_timer: IdleTimer, + stream_settings: StreamSettings, + ) -> None: """Initialize recorder output.""" - super().__init__(hass, idle_timer) + super().__init__(hass, idle_timer, stream_settings) self.video_path: str @property diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index f8d12c1cb44..4cfe8864de0 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -16,9 +16,7 @@ from homeassistant.core import HomeAssistant from . import redact_credentials from .const import ( - ATTR_SETTINGS, AUDIO_CODECS, - DOMAIN, HLS_PROVIDER, MAX_MISSING_DTS, MAX_TIMESTAMP_GAP, @@ -87,7 +85,7 @@ class StreamState: # simple to check for discontinuity at output time, and to determine # the discontinuity sequence number. self._stream_id += 1 - # Call discontinuity to remove incomplete segment from the HLS output + # Call discontinuity to fix incomplete segment in HLS output if hls_output := self._outputs_callback().get(HLS_PROVIDER): cast(HlsStreamOutput, hls_output).discontinuity() @@ -111,6 +109,7 @@ class StreamMuxer: video_stream: av.video.VideoStream, audio_stream: av.audio.stream.AudioStream | None, stream_state: StreamState, + stream_settings: StreamSettings, ) -> None: """Initialize StreamMuxer.""" self._hass = hass @@ -126,7 +125,7 @@ class StreamMuxer: self._memory_file_pos: int = cast(int, None) self._part_start_dts: int = cast(int, None) self._part_has_keyframe = False - self._stream_settings: StreamSettings = hass.data[DOMAIN][ATTR_SETTINGS] + self._stream_settings = stream_settings self._stream_state = stream_state self._start_time = datetime.datetime.utcnow() @@ -445,19 +444,20 @@ def unsupported_audio(packets: Iterator[av.Packet], audio_stream: Any) -> bool: def stream_worker( source: str, - options: dict[str, str], + pyav_options: dict[str, str], + stream_settings: StreamSettings, stream_state: StreamState, keyframe_converter: KeyFrameConverter, quit_event: Event, ) -> None: """Handle consuming streams.""" - if av.library_versions["libavformat"][0] >= 59 and "stimeout" in options: + if av.library_versions["libavformat"][0] >= 59 and "stimeout" in pyav_options: # the stimeout option was renamed to timeout as of ffmpeg 5.0 - options["timeout"] = options["stimeout"] - del options["stimeout"] + pyav_options["timeout"] = pyav_options["stimeout"] + del pyav_options["stimeout"] try: - container = av.open(source, options=options, timeout=SOURCE_TIMEOUT) + container = av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT) except av.AVError as err: raise StreamWorkerError( f"Error opening stream ({err.type}, {err.strerror}) {redact_credentials(str(source))}" @@ -480,6 +480,9 @@ def stream_worker( # Some audio streams do not have a profile and throw errors when remuxing if audio_stream and audio_stream.profile is None: audio_stream = None + # Disable ll-hls for hls inputs + if container.format.name == "hls": + stream_settings.ll_hls = False stream_state.diagnostics.set_value("container_format", container.format.name) stream_state.diagnostics.set_value("video_codec", video_stream.name) if audio_stream: @@ -535,7 +538,9 @@ def stream_worker( "Error demuxing stream while finding first packet: %s" % str(ex) ) from ex - muxer = StreamMuxer(stream_state.hass, video_stream, audio_stream, stream_state) + muxer = StreamMuxer( + stream_state.hass, video_stream, audio_stream, stream_state, stream_settings + ) muxer.reset(start_dts) # Mux the first keyframe, then proceed through the rest of the packets diff --git a/tests/components/stream/test_ll_hls.py b/tests/components/stream/test_ll_hls.py index 4aaec93d646..447b9ff58e9 100644 --- a/tests/components/stream/test_ll_hls.py +++ b/tests/components/stream/test_ll_hls.py @@ -91,12 +91,12 @@ def make_segment_with_parts( ): """Create a playlist response for a segment including part segments.""" response = [] + if discontinuity: + response.append("#EXT-X-DISCONTINUITY") for i in range(num_parts): response.append( f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}' ) - if discontinuity: - response.append("#EXT-X-DISCONTINUITY") response.extend( [ "#EXT-X-PROGRAM-DATE-TIME:" diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index a70f2be81b8..298d7287e69 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -268,17 +268,24 @@ class MockPyAv: return self.container -def run_worker(hass, stream, stream_source): +def run_worker(hass, stream, stream_source, stream_settings=None): """Run the stream worker under test.""" stream_state = StreamState(hass, stream.outputs, stream._diagnostics) stream_worker( - stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event() + stream_source, + {}, + stream_settings or hass.data[DOMAIN][ATTR_SETTINGS], + stream_state, + KeyFrameConverter(hass), + threading.Event(), ) -async def async_decode_stream(hass, packets, py_av=None): +async def async_decode_stream(hass, packets, py_av=None, stream_settings=None): """Start a stream worker that decodes incoming stream packets into output segments.""" - stream = Stream(hass, STREAM_SOURCE, {}) + stream = Stream( + hass, STREAM_SOURCE, {}, stream_settings or hass.data[DOMAIN][ATTR_SETTINGS] + ) stream.add_provider(HLS_PROVIDER) if not py_av: @@ -290,7 +297,7 @@ async def async_decode_stream(hass, packets, py_av=None): side_effect=py_av.capture_buffer.capture_output_segment, ): try: - run_worker(hass, stream, STREAM_SOURCE) + run_worker(hass, stream, STREAM_SOURCE, stream_settings) except StreamEndedError: # Tests only use a limited number of packets, then the worker exits as expected. In # production, stream ending would be unexpected. @@ -304,7 +311,7 @@ async def async_decode_stream(hass, packets, py_av=None): async def test_stream_open_fails(hass): """Test failure on stream open.""" - stream = Stream(hass, STREAM_SOURCE, {}) + stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS]) stream.add_provider(HLS_PROVIDER) with patch("av.open") as av_open, pytest.raises(StreamWorkerError): av_open.side_effect = av.error.InvalidDataError(-2, "error") @@ -637,7 +644,7 @@ async def test_stream_stopped_while_decoding(hass): worker_open = threading.Event() worker_wake = threading.Event() - stream = Stream(hass, STREAM_SOURCE, {}) + stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS]) stream.add_provider(HLS_PROVIDER) py_av = MockPyAv() @@ -667,7 +674,7 @@ async def test_update_stream_source(hass): worker_open = threading.Event() worker_wake = threading.Event() - stream = Stream(hass, STREAM_SOURCE, {}) + stream = Stream(hass, STREAM_SOURCE, {}, hass.data[DOMAIN][ATTR_SETTINGS]) stream.add_provider(HLS_PROVIDER) # Note that retries are disabled by default in tests, however the stream is "restarted" when # the stream source is updated. @@ -709,7 +716,9 @@ async def test_update_stream_source(hass): async def test_worker_log(hass, caplog): """Test that the worker logs the url without username and password.""" - stream = Stream(hass, "https://abcd:efgh@foo.bar", {}) + stream = Stream( + hass, "https://abcd:efgh@foo.bar", {}, hass.data[DOMAIN][ATTR_SETTINGS] + ) stream.add_provider(HLS_PROVIDER) with patch("av.open") as av_open, pytest.raises(StreamWorkerError) as err: @@ -906,3 +915,24 @@ async def test_get_image(hass, record_worker_sync): assert await stream.async_get_image() == EMPTY_8_6_JPEG await stream.stop() + + +async def test_worker_disable_ll_hls(hass): + """Test that the worker disables ll-hls for hls inputs.""" + stream_settings = StreamSettings( + ll_hls=True, + min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS + - SEGMENT_DURATION_ADJUSTER, + part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS, + hls_advance_part_limit=3, + hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS, + ) + py_av = MockPyAv() + py_av.container.format.name = "hls" + await async_decode_stream( + hass, + PacketSequence(TEST_SEQUENCE_LENGTH), + py_av=py_av, + stream_settings=stream_settings, + ) + assert stream_settings.ll_hls is False