Fix EXT-X-PROGRAM-DATE-TIME in stream (#58036)

* Fix EXT-X-PROGRAM-DATE-TIME in stream

* Update fragment duration comments in worker

* Update duration test in worker

* Augment test on low latency playlists

* Reset start_time on discontinuity
This commit is contained in:
uvjustin 2021-10-25 11:33:41 +08:00 committed by GitHub
parent e9ca511327
commit 06008bc343
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 32 deletions

View File

@ -93,11 +93,17 @@ class SegmentBuffer:
# Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in
# a "Part" that can be combined with the data from all the other "Part"s, plus an init # a "Part" that can be combined with the data from all the other "Part"s, plus an init
# section, to reconstitute the data in a "Segment". # section, to reconstitute the data in a "Segment".
# frag_duration seems to be a minimum threshold for determining part boundaries, so some # frag_duration is the threshold for determining part boundaries, and the dts of the last
# parts may have a higher duration. Since Part Target Duration is used in LL-HLS as a # packet in the part should correspond to a duration that is smaller than this value.
# maximum threshold for part durations, we scale that number down here by .85 and hope # However, as the part duration includes the duration of the last frame, the part duration
# that the output part durations stay below the maximum Part Target Duration threshold. # will be equal to or greater than this value.
# See https://datatracker.ietf.org/doc/html/draft-pantos-hls-rfc8216bis#section-4.4.4.9 # We previously scaled this number down by .85 to account for this while keeping within
# the 15% variance allowed in part duration. However, this did not work when inputs had
# an audio stream - sometimes the fragment would get cut on the audio packet, causing
# the durations to actually be to short.
# The current approach is to use this frag_duration for creating the media while
# adjusting the metadata duration to keep the durations in the metadata below the
# part_target_duration threshold.
"frag_duration": str( "frag_duration": str(
self._stream_settings.part_target_duration * 1e6 self._stream_settings.part_target_duration * 1e6
), ),
@ -153,8 +159,6 @@ class SegmentBuffer:
): ):
# Flush segment (also flushes the stub part segment) # Flush segment (also flushes the stub part segment)
self.flush(packet, last_part=True) self.flush(packet, last_part=True)
# Reinitialize
self.reset(packet.dts)
# Mux the packet # Mux the packet
packet.stream = self._output_video_stream packet.stream = self._output_video_stream
@ -201,6 +205,10 @@ class SegmentBuffer:
# value which exceeds the part_target_duration. This can muck up the # value which exceeds the part_target_duration. This can muck up the
# duration of both this part and the next part. An easy fix is to just # duration of both this part and the next part. An easy fix is to just
# use the current packet dts and cap it by the part target duration. # use the current packet dts and cap it by the part target duration.
# The adjustment may cause a drift between this adjusted duration
# (used in the metadata) and the media duration, but the drift should be
# automatically corrected when the part duration cleanly divides the
# framerate.
current_dts = min( current_dts = min(
packet.dts, packet.dts,
self._part_start_dts self._part_start_dts
@ -226,6 +234,8 @@ class SegmentBuffer:
if last_part: if last_part:
# If we've written the last part, we can close the memory_file. # If we've written the last part, we can close the memory_file.
self._memory_file.close() # We don't need the BytesIO object anymore self._memory_file.close() # We don't need the BytesIO object anymore
# Reinitialize
self.reset(current_dts)
else: else:
# For the last part, these will get set again elsewhere so we can skip # For the last part, these will get set again elsewhere so we can skip
# setting them here. # setting them here.
@ -239,6 +249,7 @@ class SegmentBuffer:
# simple to check for discontinuity at output time, and to determine # simple to check for discontinuity at output time, and to determine
# the discontinuity sequence number. # the discontinuity sequence number.
self._stream_id += 1 self._stream_id += 1
self._start_time = datetime.datetime.utcnow()
def close(self) -> None: def close(self) -> None:
"""Close stream buffer.""" """Close stream buffer."""

View File

@ -83,15 +83,18 @@ def make_playlist(
discontinuity_sequence=0, discontinuity_sequence=0,
segments=None, segments=None,
hint=None, hint=None,
segment_duration=None,
part_target_duration=None, part_target_duration=None,
): ):
"""Create a an hls playlist response for tests to assert on.""" """Create a an hls playlist response for tests to assert on."""
if not segment_duration:
segment_duration = SEGMENT_DURATION
response = [ response = [
"#EXTM3U", "#EXTM3U",
"#EXT-X-VERSION:6", "#EXT-X-VERSION:6",
"#EXT-X-INDEPENDENT-SEGMENTS", "#EXT-X-INDEPENDENT-SEGMENTS",
'#EXT-X-MAP:URI="init.mp4"', '#EXT-X-MAP:URI="init.mp4"',
f"#EXT-X-TARGETDURATION:{SEGMENT_DURATION}", f"#EXT-X-TARGETDURATION:{segment_duration}",
f"#EXT-X-MEDIA-SEQUENCE:{sequence}", f"#EXT-X-MEDIA-SEQUENCE:{sequence}",
f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}", f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}",
] ]
@ -105,7 +108,7 @@ def make_playlist(
) )
else: else:
response.append( response.append(
f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_NON_LL_HLS*SEGMENT_DURATION:.3f},PRECISE=YES", f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START_NON_LL_HLS*segment_duration:.3f},PRECISE=YES",
) )
if segments: if segments:
response.extend(segments) response.extend(segments)

View File

@ -1,10 +1,13 @@
"""The tests for hls streams.""" """The tests for hls streams."""
import asyncio import asyncio
from collections import deque
from http import HTTPStatus from http import HTTPStatus
import itertools import itertools
import math
import re import re
from urllib.parse import urlparse from urllib.parse import urlparse
from dateutil import parser
import pytest import pytest
from homeassistant.components.stream import create_stream from homeassistant.components.stream import create_stream
@ -19,7 +22,7 @@ from homeassistant.components.stream.const import (
from homeassistant.components.stream.core import Part from homeassistant.components.stream.core import Part
from homeassistant.setup import async_setup_component from homeassistant.setup import async_setup_component
from .test_hls import SEGMENT_DURATION, STREAM_SOURCE, HlsClient, make_playlist from .test_hls import STREAM_SOURCE, HlsClient, make_playlist
from tests.components.stream.common import ( from tests.components.stream.common import (
FAKE_TIME, FAKE_TIME,
@ -27,7 +30,8 @@ from tests.components.stream.common import (
generate_h264_video, generate_h264_video,
) )
TEST_PART_DURATION = 1 SEGMENT_DURATION = 6
TEST_PART_DURATION = 0.75
NUM_PART_SEGMENTS = int(-(-SEGMENT_DURATION // TEST_PART_DURATION)) NUM_PART_SEGMENTS = int(-(-SEGMENT_DURATION // TEST_PART_DURATION))
PART_INDEPENDENT_PERIOD = int(1 / TEST_PART_DURATION) or 1 PART_INDEPENDENT_PERIOD = int(1 / TEST_PART_DURATION) or 1
BYTERANGE_LENGTH = 1 BYTERANGE_LENGTH = 1
@ -98,7 +102,7 @@ def make_segment_with_parts(
"#EXT-X-PROGRAM-DATE-TIME:" "#EXT-X-PROGRAM-DATE-TIME:"
+ FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z", + "Z",
f"#EXTINF:{SEGMENT_DURATION:.3f},", f"#EXTINF:{math.ceil(SEGMENT_DURATION/TEST_PART_DURATION)*TEST_PART_DURATION:.3f},",
f"./segment/{segment}.m4s", f"./segment/{segment}.m4s",
] ]
) )
@ -124,15 +128,18 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
"stream": { "stream": {
CONF_LL_HLS: True, CONF_LL_HLS: True,
CONF_SEGMENT_DURATION: SEGMENT_DURATION, CONF_SEGMENT_DURATION: SEGMENT_DURATION,
CONF_PART_DURATION: TEST_PART_DURATION, # Use a slight mismatch in PART_DURATION to mimic
# misalignments with source DTSs
CONF_PART_DURATION: TEST_PART_DURATION - 0.01,
} }
}, },
) )
stream_worker_sync.pause() stream_worker_sync.pause()
num_playlist_segments = 3
# Setup demo HLS track # Setup demo HLS track
source = generate_h264_video(duration=SEGMENT_DURATION + 1) source = generate_h264_video(duration=num_playlist_segments * SEGMENT_DURATION + 2)
stream = create_stream(hass, source, {}) stream = create_stream(hass, source, {})
# Request stream # Request stream
@ -152,7 +159,9 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
# Fetch playlist # Fetch playlist
playlist_url = "/" + master_playlist.splitlines()[-1] playlist_url = "/" + master_playlist.splitlines()[-1]
playlist_response = await hls_client.get(playlist_url) playlist_response = await hls_client.get(
playlist_url + f"?_HLS_msn={num_playlist_segments-1}"
)
assert playlist_response.status == HTTPStatus.OK assert playlist_response.status == HTTPStatus.OK
# Fetch segments # Fetch segments
@ -181,27 +190,53 @@ async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
return False return False
return True return True
# Fetch all completed part segments # Parse playlist
part_re = re.compile( part_re = re.compile(
r'#EXT-X-PART:DURATION=[0-9].[0-9]{5,5},URI="(?P<part_url>.+?)",BYTERANGE="(?P<byterange_length>[0-9]+?)@(?P<byterange_start>[0-9]+?)"(,INDEPENDENT=YES)?' r'#EXT-X-PART:DURATION=(?P<part_duration>[0-9]{1,}.[0-9]{3,}),URI="(?P<part_url>.+?)"(,INDEPENDENT=YES)?'
) )
datetime_re = re.compile(r"#EXT-X-PROGRAM-DATE-TIME:(?P<datetime>.+)")
inf_re = re.compile(r"#EXTINF:(?P<segment_duration>[0-9]{1,}.[0-9]{3,}),")
# keep track of which tests were done (indexed by re)
tested = {regex: False for regex in (part_re, datetime_re, inf_re)}
# keep track of times and durations along playlist for checking consistency
part_durations = []
segment_duration = 0
datetimes = deque()
for line in playlist.splitlines(): for line in playlist.splitlines():
match = part_re.match(line) match = part_re.match(line)
if match: if match:
# Fetch all completed part segments
part_durations.append(float(match.group("part_duration")))
part_segment_url = "/" + match.group("part_url") part_segment_url = "/" + match.group("part_url")
byterange_end = (
int(match.group("byterange_length"))
+ int(match.group("byterange_start"))
- 1
)
part_segment_response = await hls_client.get( part_segment_response = await hls_client.get(
part_segment_url, part_segment_url,
headers={
"Range": f'bytes={match.group("byterange_start")}-{byterange_end}'
},
) )
assert part_segment_response.status == HTTPStatus.PARTIAL_CONTENT assert part_segment_response.status == HTTPStatus.OK
assert check_part_is_moof_mdat(await part_segment_response.read()) assert check_part_is_moof_mdat(await part_segment_response.read())
tested[part_re] = True
continue
match = datetime_re.match(line)
if match:
datetimes.append(parser.parse(match.group("datetime")))
# Check that segment durations are consistent with PROGRAM-DATE-TIME
if len(datetimes) > 1:
datetime_duration = (
datetimes[-1] - datetimes.popleft()
).total_seconds()
if segment_duration:
assert datetime_duration == segment_duration
tested[datetime_re] = True
continue
match = inf_re.match(line)
if match:
segment_duration = float(match.group("segment_duration"))
# Check that segment durations are consistent with part durations
if len(part_durations) > 1:
assert math.isclose(sum(part_durations), segment_duration)
tested[inf_re] = True
part_durations.clear()
# make sure all playlist tests were performed
assert all(tested.values())
stream_worker_sync.resume() stream_worker_sync.resume()
@ -252,6 +287,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
for i in range(2) for i in range(2)
], ],
hint=make_hint(2, 0), hint=make_hint(2, 0),
segment_duration=SEGMENT_DURATION,
part_target_duration=hls.stream_settings.part_target_duration, part_target_duration=hls.stream_settings.part_target_duration,
) )
@ -273,6 +309,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
for i in range(3) for i in range(3)
], ],
hint=make_hint(3, 0), hint=make_hint(3, 0),
segment_duration=SEGMENT_DURATION,
part_target_duration=hls.stream_settings.part_target_duration, part_target_duration=hls.stream_settings.part_target_duration,
) )

View File

@ -702,12 +702,21 @@ async def test_durations(hass, record_worker_sync):
for part in segment.parts: for part in segment.parts:
av_part = av.open(io.BytesIO(segment.init + part.data)) av_part = av.open(io.BytesIO(segment.init + part.data))
running_metadata_duration += part.duration running_metadata_duration += part.duration
# av_part.duration will just return the largest dts in av_part. # av_part.duration actually returns the dts of the first packet of
# When we normalize by av.time_base this should equal the running duration # the next av_part. When we normalize this by av.time_base we get
assert math.isclose( # the running duration of the media.
running_metadata_duration, # The metadata duration is slightly different. The worker has
av_part.duration / av.time_base, # some flexibility of where to set each metadata boundary, and
abs_tol=1e-6, # when the media's duration is slightly too long, the metadata
# duration is adjusted down. This means that the running metadata
# duration may be up to one video frame duration smaller than the
# part duration.
assert running_metadata_duration < av_part.duration / av.time_base + 1e-6
assert (
running_metadata_duration
> av_part.duration / av.time_base
- 1 / av_part.streams.video[0].rate
- 1e-6
) )
av_part.close() av_part.close()
# check that the Part durations are consistent with the Segment durations # check that the Part durations are consistent with the Segment durations