diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index a576ff6d02b..398d45595d3 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -93,11 +93,17 @@ class SegmentBuffer: # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # a "Part" that can be combined with the data from all the other "Part"s, plus an init # section, to reconstitute the data in a "Segment". - # frag_duration seems to be a minimum threshold for determining part boundaries, so some - # parts may have a higher duration. Since Part Target Duration is used in LL-HLS as a - # maximum threshold for part durations, we scale that number down here by .85 and hope - # that the output part durations stay below the maximum Part Target Duration threshold. - # See https://datatracker.ietf.org/doc/html/draft-pantos-hls-rfc8216bis#section-4.4.4.9 + # frag_duration is the threshold for determining part boundaries, and the dts of the last + # packet in the part should correspond to a duration that is smaller than this value. + # However, as the part duration includes the duration of the last frame, the part duration + # will be equal to or greater than this value. + # We previously scaled this number down by .85 to account for this while keeping within + # the 15% variance allowed in part duration. However, this did not work when inputs had + # an audio stream - sometimes the fragment would get cut on the audio packet, causing + # the durations to actually be to short. + # The current approach is to use this frag_duration for creating the media while + # adjusting the metadata duration to keep the durations in the metadata below the + # part_target_duration threshold. "frag_duration": str( self._stream_settings.part_target_duration * 1e6 ), @@ -153,8 +159,6 @@ class SegmentBuffer: ): # Flush segment (also flushes the stub part segment) self.flush(packet, last_part=True) - # Reinitialize - self.reset(packet.dts) # Mux the packet packet.stream = self._output_video_stream @@ -201,6 +205,10 @@ class SegmentBuffer: # value which exceeds the part_target_duration. This can muck up the # duration of both this part and the next part. An easy fix is to just # use the current packet dts and cap it by the part target duration. + # The adjustment may cause a drift between this adjusted duration + # (used in the metadata) and the media duration, but the drift should be + # automatically corrected when the part duration cleanly divides the + # framerate. current_dts = min( packet.dts, self._part_start_dts @@ -226,6 +234,8 @@ class SegmentBuffer: if last_part: # If we've written the last part, we can close the memory_file. self._memory_file.close() # We don't need the BytesIO object anymore + # Reinitialize + self.reset(current_dts) else: # For the last part, these will get set again elsewhere so we can skip # setting them here. @@ -239,6 +249,7 @@ class SegmentBuffer: # simple to check for discontinuity at output time, and to determine # the discontinuity sequence number. self._stream_id += 1 + self._start_time = datetime.datetime.utcnow() def close(self) -> None: """Close stream buffer.""" diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index c3c4779a948..07c8cc88a65 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -83,15 +83,18 @@ def make_playlist( discontinuity_sequence=0, segments=None, hint=None, + segment_duration=None, part_target_duration=None, ): """Create a an hls playlist response for tests to assert on.""" + if not segment_duration: + segment_duration = SEGMENT_DURATION response = [ "#EXTM3U", "#EXT-X-VERSION:6", "#EXT-X-INDEPENDENT-SEGMENTS", '#EXT-X-MAP:URI="init.mp4"', - f"#EXT-X-TARGETDURATION:{SEGMENT_DURATION}", + f"#EXT-X-TARGETDURATION:{segment_duration}", f"#EXT-X-MEDIA-SEQUENCE:{sequence}", f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}", ] @@ -105,7 +108,7 @@ def make_playlist( ) else: response.append( - f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_NON_LL_HLS*SEGMENT_DURATION:.3f},PRECISE=YES", + f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_NON_LL_HLS*segment_duration:.3f},PRECISE=YES", ) if segments: response.extend(segments) diff --git a/tests/components/stream/test_ll_hls.py b/tests/components/stream/test_ll_hls.py index 324b1435110..1156832ada9 100644 --- a/tests/components/stream/test_ll_hls.py +++ b/tests/components/stream/test_ll_hls.py @@ -1,10 +1,13 @@ """The tests for hls streams.""" import asyncio +from collections import deque from http import HTTPStatus import itertools +import math import re from urllib.parse import urlparse +from dateutil import parser import pytest from homeassistant.components.stream import create_stream @@ -19,7 +22,7 @@ from homeassistant.components.stream.const import ( from homeassistant.components.stream.core import Part from homeassistant.setup import async_setup_component -from .test_hls import SEGMENT_DURATION, STREAM_SOURCE, HlsClient, make_playlist +from .test_hls import STREAM_SOURCE, HlsClient, make_playlist from tests.components.stream.common import ( FAKE_TIME, @@ -27,7 +30,8 @@ from tests.components.stream.common import ( generate_h264_video, ) -TEST_PART_DURATION = 1 +SEGMENT_DURATION = 6 +TEST_PART_DURATION = 0.75 NUM_PART_SEGMENTS = int(-(-SEGMENT_DURATION // TEST_PART_DURATION)) PART_INDEPENDENT_PERIOD = int(1 / TEST_PART_DURATION) or 1 BYTERANGE_LENGTH = 1 @@ -98,7 +102,7 @@ def make_segment_with_parts( "#EXT-X-PROGRAM-DATE-TIME:" + FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", - f"#EXTINF:{SEGMENT_DURATION:.3f},", + f"#EXTINF:{math.ceil(SEGMENT_DURATION/TEST_PART_DURATION)*TEST_PART_DURATION:.3f},", f"./segment/{segment}.m4s", ] ) @@ -124,15 +128,18 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync): "stream": { CONF_LL_HLS: True, CONF_SEGMENT_DURATION: SEGMENT_DURATION, - CONF_PART_DURATION: TEST_PART_DURATION, + # Use a slight mismatch in PART_DURATION to mimic + # misalignments with source DTSs + CONF_PART_DURATION: TEST_PART_DURATION - 0.01, } }, ) stream_worker_sync.pause() + num_playlist_segments = 3 # Setup demo HLS track - source = generate_h264_video(duration=SEGMENT_DURATION + 1) + source = generate_h264_video(duration=num_playlist_segments * SEGMENT_DURATION + 2) stream = create_stream(hass, source, {}) # Request stream @@ -152,7 +159,9 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync): # Fetch playlist playlist_url = "/" + master_playlist.splitlines()[-1] - playlist_response = await hls_client.get(playlist_url) + playlist_response = await hls_client.get( + playlist_url + f"?_HLS_msn={num_playlist_segments-1}" + ) assert playlist_response.status == HTTPStatus.OK # Fetch segments @@ -181,27 +190,53 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync): return False return True - # Fetch all completed part segments + # Parse playlist part_re = re.compile( - r'#EXT-X-PART:DURATION=[0-9].[0-9]{5,5},URI="(?P.+?)",BYTERANGE="(?P[0-9]+?)@(?P[0-9]+?)"(,INDEPENDENT=YES)?' + r'#EXT-X-PART:DURATION=(?P[0-9]{1,}.[0-9]{3,}),URI="(?P.+?)"(,INDEPENDENT=YES)?' ) + datetime_re = re.compile(r"#EXT-X-PROGRAM-DATE-TIME:(?P.+)") + inf_re = re.compile(r"#EXTINF:(?P[0-9]{1,}.[0-9]{3,}),") + # keep track of which tests were done (indexed by re) + tested = {regex: False for regex in (part_re, datetime_re, inf_re)} + # keep track of times and durations along playlist for checking consistency + part_durations = [] + segment_duration = 0 + datetimes = deque() for line in playlist.splitlines(): match = part_re.match(line) if match: + # Fetch all completed part segments + part_durations.append(float(match.group("part_duration"))) part_segment_url = "/" + match.group("part_url") - byterange_end = ( - int(match.group("byterange_length")) - + int(match.group("byterange_start")) - - 1 - ) part_segment_response = await hls_client.get( part_segment_url, - headers={ - "Range": f'bytes={match.group("byterange_start")}-{byterange_end}' - }, ) - assert part_segment_response.status == HTTPStatus.PARTIAL_CONTENT + assert part_segment_response.status == HTTPStatus.OK assert check_part_is_moof_mdat(await part_segment_response.read()) + tested[part_re] = True + continue + match = datetime_re.match(line) + if match: + datetimes.append(parser.parse(match.group("datetime"))) + # Check that segment durations are consistent with PROGRAM-DATE-TIME + if len(datetimes) > 1: + datetime_duration = ( + datetimes[-1] - datetimes.popleft() + ).total_seconds() + if segment_duration: + assert datetime_duration == segment_duration + tested[datetime_re] = True + continue + match = inf_re.match(line) + if match: + segment_duration = float(match.group("segment_duration")) + # Check that segment durations are consistent with part durations + if len(part_durations) > 1: + assert math.isclose(sum(part_durations), segment_duration) + tested[inf_re] = True + part_durations.clear() + # make sure all playlist tests were performed + assert all(tested.values()) stream_worker_sync.resume() @@ -252,6 +287,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync): for i in range(2) ], hint=make_hint(2, 0), + segment_duration=SEGMENT_DURATION, part_target_duration=hls.stream_settings.part_target_duration, ) @@ -273,6 +309,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync): for i in range(3) ], hint=make_hint(3, 0), + segment_duration=SEGMENT_DURATION, part_target_duration=hls.stream_settings.part_target_duration, ) diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index e353f950aea..12f859b203b 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -702,12 +702,21 @@ async def test_durations(hass, record_worker_sync): for part in segment.parts: av_part = av.open(io.BytesIO(segment.init + part.data)) running_metadata_duration += part.duration - # av_part.duration will just return the largest dts in av_part. - # When we normalize by av.time_base this should equal the running duration - assert math.isclose( - running_metadata_duration, - av_part.duration / av.time_base, - abs_tol=1e-6, + # av_part.duration actually returns the dts of the first packet of + # the next av_part. When we normalize this by av.time_base we get + # the running duration of the media. + # The metadata duration is slightly different. The worker has + # some flexibility of where to set each metadata boundary, and + # when the media's duration is slightly too long, the metadata + # duration is adjusted down. This means that the running metadata + # duration may be up to one video frame duration smaller than the + # part duration. + assert running_metadata_duration < av_part.duration / av.time_base + 1e-6 + assert ( + running_metadata_duration + > av_part.duration / av.time_base + - 1 / av_part.streams.video[0].rate + - 1e-6 ) av_part.close() # check that the Part durations are consistent with the Segment durations