Remove byte-range addressed parts in stream (#55396)

Add individually addressed parts
This commit is contained in:
uvjustin 2021-08-30 13:20:19 +08:00 committed by GitHub
parent 5549a925b8
commit 071fcee9a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 112 additions and 248 deletions

View File

@ -3,9 +3,8 @@ from __future__ import annotations
import asyncio
from collections import deque
from collections.abc import Generator, Iterable
from collections.abc import Iterable
import datetime
import itertools
from typing import TYPE_CHECKING
from aiohttp import web
@ -58,10 +57,7 @@ class Segment:
start_time: datetime.datetime = attr.ib()
_stream_outputs: Iterable[StreamOutput] = attr.ib()
duration: float = attr.ib(default=0)
# Parts are stored in a dict indexed by byterange for easy lookup
# As of Python 3.7, insertion order is preserved, and we insert
# in sequential order, so the Parts are ordered
parts_by_byterange: dict[int, Part] = attr.ib(factory=dict)
parts: list[Part] = attr.ib(factory=list)
# Store text of this segment's hls playlist for reuse
# Use list[str] for easy appends
hls_playlist_template: list[str] = attr.ib(factory=list)
@ -89,13 +85,7 @@ class Segment:
@property
def data_size(self) -> int:
"""Return the size of all part data without init in bytes."""
# We can use the last part to quickly calculate the total data size.
if not self.parts_by_byterange:
return 0
last_http_range_start, last_part = next(
reversed(self.parts_by_byterange.items())
)
return last_http_range_start + len(last_part.data)
return sum(len(part.data) for part in self.parts)
@callback
def async_add_part(
@ -107,36 +97,14 @@ class Segment:
Duration is non zero only for the last part.
"""
self.parts_by_byterange[self.data_size] = part
self.parts.append(part)
self.duration = duration
for output in self._stream_outputs:
output.part_put()
def get_data(self) -> bytes:
"""Return reconstructed data for all parts as bytes, without init."""
return b"".join([part.data for part in self.parts_by_byterange.values()])
def get_aggregating_bytes(
self, start_loc: int, end_loc: int | float
) -> Generator[bytes, None, None]:
"""Yield available remaining data until segment is complete or end_loc is reached.
Begin at start_loc. End at end_loc (exclusive).
Used to help serve a range request on a segment.
"""
pos = start_loc
while (part := self.parts_by_byterange.get(pos)) or not self.complete:
if not part:
yield b""
continue
pos += len(part.data)
# Check stopping condition and trim output if necessary
if pos >= end_loc:
assert isinstance(end_loc, int)
# Trimming is probably not necessary, but it doesn't hurt
yield part.data[: len(part.data) + end_loc - pos]
return
yield part.data
return b"".join([part.data for part in self.parts])
def _render_hls_template(self, last_stream_id: int, render_parts: bool) -> str:
"""Render the HLS playlist section for the Segment.
@ -151,15 +119,12 @@ class Segment:
# This is a placeholder where the rendered parts will be inserted
self.hls_playlist_template.append("{}")
if render_parts:
for http_range_start, part in itertools.islice(
self.parts_by_byterange.items(),
self.hls_num_parts_rendered,
None,
for part_num, part in enumerate(
self.parts[self.hls_num_parts_rendered :], self.hls_num_parts_rendered
):
self.hls_playlist_parts.append(
f"#EXT-X-PART:DURATION={part.duration:.3f},URI="
f'"./segment/{self.sequence}.m4s",BYTERANGE="{len(part.data)}'
f'@{http_range_start}"{",INDEPENDENT=YES" if part.has_keyframe else ""}'
f'"./segment/{self.sequence}.{part_num}.m4s"{",INDEPENDENT=YES" if part.has_keyframe else ""}'
)
if self.complete:
# Construct the final playlist_template. The placeholder will share a line with
@ -187,7 +152,7 @@ class Segment:
self.hls_playlist_template = ["\n".join(self.hls_playlist_template)]
# lstrip discards extra preceding newline in case first render was empty
self.hls_playlist_parts = ["\n".join(self.hls_playlist_parts).lstrip()]
self.hls_num_parts_rendered = len(self.parts_by_byterange)
self.hls_num_parts_rendered = len(self.parts)
self.hls_playlist_complete = self.complete
return self.hls_playlist_template[0]
@ -208,11 +173,13 @@ class Segment:
# pylint: disable=undefined-loop-variable
if self.complete: # Next part belongs to next segment
sequence = self.sequence + 1
start = 0
part_num = 0
else: # Next part is in the same segment
sequence = self.sequence
start = self.data_size
hint = f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.m4s",BYTERANGE-START={start}'
part_num = len(self.parts)
hint = (
f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.{part_num}.m4s"'
)
return (playlist + "\n" + hint) if playlist else hint
@ -367,7 +334,7 @@ class StreamView(HomeAssistantView):
platform = None
async def get(
self, request: web.Request, token: str, sequence: str = ""
self, request: web.Request, token: str, sequence: str = "", part_num: str = ""
) -> web.StreamResponse:
"""Start a GET request."""
hass = request.app["hass"]
@ -383,10 +350,10 @@ class StreamView(HomeAssistantView):
# Start worker if not already started
stream.start()
return await self.handle(request, stream, sequence)
return await self.handle(request, stream, sequence, part_num)
async def handle(
self, request: web.Request, stream: Stream, sequence: str
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.StreamResponse:
"""Handle the stream request."""
raise NotImplementedError()

View File

@ -34,6 +34,7 @@ def async_setup_hls(hass: HomeAssistant) -> str:
hass.http.register_view(HlsSegmentView())
hass.http.register_view(HlsInitView())
hass.http.register_view(HlsMasterPlaylistView())
hass.http.register_view(HlsPartView())
return "/api/hls/{}/master_playlist.m3u8"
@ -94,7 +95,7 @@ class HlsMasterPlaylistView(StreamView):
return "\n".join(lines) + "\n"
async def handle(
self, request: web.Request, stream: Stream, sequence: str
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.Response:
"""Return m3u8 playlist."""
track = stream.add_provider(HLS_PROVIDER)
@ -220,7 +221,7 @@ class HlsPlaylistView(StreamView):
)
async def handle(
self, request: web.Request, stream: Stream, sequence: str
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.Response:
"""Return m3u8 playlist."""
track: HlsStreamOutput = cast(
@ -263,7 +264,7 @@ class HlsPlaylistView(StreamView):
(last_segment := track.last_segment)
and hls_msn == last_segment.sequence
and hls_part
>= len(last_segment.parts_by_byterange)
>= len(last_segment.parts)
- 1
+ track.stream_settings.hls_advance_part_limit
):
@ -273,7 +274,7 @@ class HlsPlaylistView(StreamView):
while (
(last_segment := track.last_segment)
and hls_msn == last_segment.sequence
and hls_part >= len(last_segment.parts_by_byterange)
and hls_part >= len(last_segment.parts)
):
if not await track.part_recv(
timeout=track.stream_settings.hls_part_timeout
@ -287,8 +288,8 @@ class HlsPlaylistView(StreamView):
# request as one for Part Index 0 of the following Parent Segment.
if hls_msn + 1 == last_segment.sequence:
if not (previous_segment := track.get_segment(hls_msn)) or (
hls_part >= len(previous_segment.parts_by_byterange)
and not last_segment.parts_by_byterange
hls_part >= len(previous_segment.parts)
and not last_segment.parts
and not await track.part_recv(
timeout=track.stream_settings.hls_part_timeout
)
@ -314,7 +315,7 @@ class HlsInitView(StreamView):
cors_allowed = True
async def handle(
self, request: web.Request, stream: Stream, sequence: str
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.Response:
"""Return init.mp4."""
track = stream.add_provider(HLS_PROVIDER)
@ -326,21 +327,17 @@ class HlsInitView(StreamView):
)
class HlsSegmentView(StreamView):
class HlsPartView(StreamView):
"""Stream view to serve a HLS fmp4 segment."""
url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s"
name = "api:stream:hls:segment"
url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.{part_num:\d+}.m4s"
name = "api:stream:hls:part"
cors_allowed = True
async def handle(
self, request: web.Request, stream: Stream, sequence: str
) -> web.StreamResponse:
"""Handle segments, part segments, and hinted segments.
For part and hinted segments, the start of the requested range must align
with a part boundary.
"""
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.Response:
"""Handle part."""
track: HlsStreamOutput = cast(
HlsStreamOutput, stream.add_provider(HLS_PROVIDER)
)
@ -360,77 +357,58 @@ class HlsSegmentView(StreamView):
status=404,
headers={"Cache-Control": f"max-age={track.target_duration:.0f}"},
)
# If the segment is ready or has been hinted, the http_range start should be at most
# equal to the end of the currently available data.
# If the segment is complete, the http_range start should be less than the end of the
# currently available data.
# If these conditions aren't met then we return a 416.
# http_range_start can be None, so use a copy that uses 0 instead of None
if (http_start := request.http_range.start or 0) > segment.data_size or (
segment.complete and http_start >= segment.data_size
):
# If the part is ready or has been hinted,
if int(part_num) == len(segment.parts):
await track.part_recv(timeout=track.stream_settings.hls_part_timeout)
if int(part_num) >= len(segment.parts):
return web.HTTPRequestRangeNotSatisfiable(
headers={
"Cache-Control": f"max-age={track.target_duration:.0f}",
"Content-Range": f"bytes */{segment.data_size}",
}
)
headers = {
"Content-Type": "video/iso.segment",
"Cache-Control": f"max-age={6*track.target_duration:.0f}",
}
# For most cases we have a 206 partial content response.
status = 206
# For the 206 responses we need to set a Content-Range header
# See https://datatracker.ietf.org/doc/html/rfc8673#section-2
if request.http_range.stop is None:
if request.http_range.start is None:
status = 200
if segment.complete:
# This is a request for a full segment which is already complete
# We should return a standard 200 response.
return web.Response(
body=segment.get_data(), headers=headers, status=status
)
# Otherwise we still return a 200 response, but it is aggregating
http_stop = float("inf")
else:
# See https://datatracker.ietf.org/doc/html/rfc7233#section-2.1
headers[
"Content-Range"
] = f"bytes {http_start}-{(http_stop:=segment.data_size)-1}/*"
else: # The remaining cases are all 206 responses
if segment.complete:
# If the segment is complete we have total size
headers["Content-Range"] = (
f"bytes {http_start}-"
+ str(
(http_stop := min(request.http_range.stop, segment.data_size))
- 1
)
+ f"/{segment.data_size}"
)
else:
# If we don't have the total size we use a *
headers[
"Content-Range"
] = f"bytes {http_start}-{(http_stop:=request.http_range.stop)-1}/*"
# Set up streaming response that we can write to as data becomes available
response = web.StreamResponse(headers=headers, status=status)
# Waiting until we write to prepare *might* give clients more accurate TTFB
# and ABR measurements, but it is probably not very useful for us since we
# only have one rendition anyway. Just prepare here for now.
await response.prepare(request)
try:
for bytes_to_write in segment.get_aggregating_bytes(
start_loc=http_start, end_loc=http_stop
):
if bytes_to_write:
await response.write(bytes_to_write)
elif not await track.part_recv(
timeout=track.stream_settings.hls_part_timeout
):
break
except ConnectionResetError:
_LOGGER.warning("Connection reset while serving HLS partial segment")
return response
return web.Response(
body=segment.parts[int(part_num)].data,
headers={
"Content-Type": "video/iso.segment",
"Cache-Control": f"max-age={6*track.target_duration:.0f}",
},
)
class HlsSegmentView(StreamView):
"""Stream view to serve a HLS fmp4 segment."""
url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s"
name = "api:stream:hls:segment"
cors_allowed = True
async def handle(
self, request: web.Request, stream: Stream, sequence: str, part_num: str
) -> web.StreamResponse:
"""Handle segments."""
track: HlsStreamOutput = cast(
HlsStreamOutput, stream.add_provider(HLS_PROVIDER)
)
track.idle_timer.awake()
# Ensure that we have a segment. If the request is from a hint for part 0
# of a segment, there is a small chance it may have arrived before the
# segment has been put. If this happens, wait for one part and retry.
if not (
(segment := track.get_segment(int(sequence)))
or (
await track.part_recv(timeout=track.stream_settings.hls_part_timeout)
and (segment := track.get_segment(int(sequence)))
)
):
return web.Response(
body=None,
status=404,
headers={"Cache-Control": f"max-age={track.target_duration:.0f}"},
)
return web.Response(
body=segment.get_data(),
headers={
"Content-Type": "video/iso.segment",
"Cache-Control": f"max-age={6*track.target_duration:.0f}",
},
)

View File

@ -345,13 +345,13 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync):
# Fetch the actual segments with a fake byte payload
for segment in hls.get_segments():
segment.init = INIT_BYTES
segment.parts_by_byterange = {
0: Part(
segment.parts = [
Part(
duration=SEGMENT_DURATION,
has_keyframe=True,
data=FAKE_PAYLOAD,
)
}
]
# The segment that fell off the buffer is not accessible
with patch.object(hls.stream_settings, "hls_part_timeout", 0.1):

View File

@ -59,9 +59,7 @@ def create_segment(sequence):
def complete_segment(segment):
"""Completes a segment by setting its duration."""
segment.duration = sum(
part.duration for part in segment.parts_by_byterange.values()
)
segment.duration = sum(part.duration for part in segment.parts)
def create_parts(source):
@ -90,9 +88,8 @@ def make_segment_with_parts(
"""Create a playlist response for a segment including part segments."""
response = []
for i in range(num_parts):
length, start = http_range_from_part(i)
response.append(
f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.m4s",BYTERANGE="{length}@{start}"{",INDEPENDENT=YES" if i%independent_period==0 else ""}'
f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}'
)
if discontinuity:
response.append("#EXT-X-DISCONTINUITY")
@ -110,8 +107,7 @@ def make_segment_with_parts(
def make_hint(segment, part):
"""Create a playlist response for the preload hint."""
_, start = http_range_from_part(part)
return f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{segment}.m4s",BYTERANGE-START={start}'
return f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{segment}.{part}.m4s"'
async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync):
@ -252,9 +248,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
assert await resp.text() == make_playlist(
sequence=0,
segments=[
make_segment_with_parts(
i, len(segment.parts_by_byterange), PART_INDEPENDENT_PERIOD
)
make_segment_with_parts(i, len(segment.parts), PART_INDEPENDENT_PERIOD)
for i in range(2)
],
hint=make_hint(2, 0),
@ -275,9 +269,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync):
assert await resp.text() == make_playlist(
sequence=0,
segments=[
make_segment_with_parts(
i, len(segment.parts_by_byterange), PART_INDEPENDENT_PERIOD
)
make_segment_with_parts(i, len(segment.parts), PART_INDEPENDENT_PERIOD)
for i in range(3)
],
hint=make_hint(3, 0),
@ -459,13 +451,13 @@ async def test_ll_hls_playlist_rollover_part(
*(
[
hls_client.get(
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)-1}"
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)-1}"
),
hls_client.get(
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)}"
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)}"
),
hls_client.get(
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)+1}"
f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)+1}"
),
hls_client.get("/playlist.m3u8?_HLS_msn=2&_HLS_part=0"),
]
@ -600,85 +592,32 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
segment.async_add_part(remaining_parts.pop(0), 0)
# Make requests for all the existing part segments
# These should succeed with a status of 206
# These should succeed
requests = asyncio.gather(
*(
hls_client.get(
"/segment/1.m4s",
headers={
"Range": f"bytes={http_range_from_part(part)[1]}-"
+ str(
http_range_from_part(part)[0]
+ http_range_from_part(part)[1]
- 1
)
},
)
hls_client.get(f"/segment/1.{part}.m4s")
for part in range(num_completed_parts)
)
)
responses = await requests
assert all(response.status == 206 for response in responses)
assert all(response.status == 200 for response in responses)
assert all(
responses[part].headers["Content-Range"]
== f"bytes {http_range_from_part(part)[1]}-"
+ str(http_range_from_part(part)[0] + http_range_from_part(part)[1] - 1)
+ "/*"
for part in range(num_completed_parts)
[
await responses[i].read() == segment.parts[i].data
for i in range(len(responses))
]
)
parts = list(segment.parts_by_byterange.values())
assert all(
[await responses[i].read() == parts[i].data for i in range(len(responses))]
)
# Make some non standard range requests.
# Request past end of previous closed segment
# Request should succeed but length will be limited to the segment length
response = await hls_client.get(
"/segment/0.m4s",
headers={"Range": f"bytes=0-{hls.get_segment(0).data_size+1}"},
)
assert response.status == 206
assert (
response.headers["Content-Range"]
== f"bytes 0-{hls.get_segment(0).data_size-1}/{hls.get_segment(0).data_size}"
)
assert (await response.read()) == hls.get_segment(0).get_data()
# Request with start range past end of current segment
# Since this is beyond the data we have (the largest starting position will be
# from a hinted request, and even that will have a starting position at
# segment.data_size), we expect a 416.
response = await hls_client.get(
"/segment/1.m4s",
headers={"Range": f"bytes={segment.data_size+1}-{VERY_LARGE_LAST_BYTE_POS}"},
)
assert response.status == 416
# Request for next segment which has not yet been hinted (we will only hint
# for this segment after segment 1 is complete).
# This should fail, but it will hold for one more part_put before failing.
hls_sync.reset_request_pool(1)
request = asyncio.create_task(
hls_client.get(
"/segment/2.m4s", headers={"Range": f"bytes=0-{VERY_LARGE_LAST_BYTE_POS}"}
)
)
request = asyncio.create_task(hls_client.get("/segment/2.0.m4s"))
await hls_sync.wait_for_handler()
hls.part_put()
response = await request
assert response.status == 404
# Make valid request for the current hint. This should succeed, but since
# it is open ended, it won't finish until the segment is complete.
hls_sync.reset_request_pool(1)
request_start = segment.data_size
request = asyncio.create_task(
hls_client.get(
"/segment/1.m4s",
headers={"Range": f"bytes={request_start}-{VERY_LARGE_LAST_BYTE_POS}"},
)
)
# Put the remaining parts and complete the segment
while remaining_parts:
await hls_sync.wait_for_handler()
@ -686,26 +625,11 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
segment.async_add_part(remaining_parts.pop(0), 0)
hls.part_put()
complete_segment(segment)
# Check the response
response = await request
assert response.status == 206
assert (
response.headers["Content-Range"]
== f"bytes {request_start}-{VERY_LARGE_LAST_BYTE_POS}/*"
)
assert await response.read() == SEQUENCE_BYTES[request_start:]
# Now the hint should have moved to segment 2
# The request for segment 2 which failed before should work now
# Also make an equivalent request with no Range parameters that
# will return the same content but with different headers
hls_sync.reset_request_pool(2)
requests = asyncio.gather(
hls_client.get(
"/segment/2.m4s", headers={"Range": f"bytes=0-{VERY_LARGE_LAST_BYTE_POS}"}
),
hls_client.get("/segment/2.m4s"),
)
hls_sync.reset_request_pool(1)
request = asyncio.create_task(hls_client.get("/segment/2.0.m4s"))
# Put an entire segment and its parts.
segment = create_segment(sequence=2)
hls.put(segment)
@ -716,16 +640,11 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync)
hls.part_put()
complete_segment(segment)
# Check the response
responses = await requests
assert responses[0].status == 206
response = await request
assert response.status == 200
assert (
responses[0].headers["Content-Range"] == f"bytes 0-{VERY_LARGE_LAST_BYTE_POS}/*"
)
assert responses[1].status == 200
assert "Content-Range" not in responses[1].headers
assert (
await response.read() == ALT_SEQUENCE_BYTES[: hls.get_segment(2).data_size]
for response in responses
await response.read()
== ALT_SEQUENCE_BYTES[: len(hls.get_segment(2).parts[0].data)]
)
stream_worker_sync.resume()

View File

@ -126,14 +126,14 @@ def add_parts_to_segment(segment, source):
"""Add relevant part data to segment for testing recorder."""
moof_locs = list(find_box(source.getbuffer(), b"moof")) + [len(source.getbuffer())]
segment.init = source.getbuffer()[: moof_locs[0]].tobytes()
segment.parts_by_byterange = {
moof_locs[i]: Part(
segment.parts = [
Part(
duration=None,
has_keyframe=None,
data=source.getbuffer()[moof_locs[i] : moof_locs[i + 1]],
)
for i in range(len(moof_locs) - 1)
}
]
async def test_recorder_save(tmpdir):

View File

@ -699,7 +699,7 @@ async def test_durations(hass, record_worker_sync):
# check that the Part duration metadata matches the durations in the media
running_metadata_duration = 0
for segment in complete_segments:
for part in segment.parts_by_byterange.values():
for part in segment.parts:
av_part = av.open(io.BytesIO(segment.init + part.data))
running_metadata_duration += part.duration
# av_part.duration will just return the largest dts in av_part.
@ -713,7 +713,7 @@ async def test_durations(hass, record_worker_sync):
# check that the Part durations are consistent with the Segment durations
for segment in complete_segments:
assert math.isclose(
sum(part.duration for part in segment.parts_by_byterange.values()),
sum(part.duration for part in segment.parts),
segment.duration,
abs_tol=1e-6,
)
@ -751,7 +751,7 @@ async def test_has_keyframe(hass, record_worker_sync):
# check that the Part has_keyframe metadata matches the keyframes in the media
for segment in complete_segments:
for part in segment.parts_by_byterange.values():
for part in segment.parts:
av_part = av.open(io.BytesIO(segment.init + part.data))
media_has_keyframe = any(
packet.is_keyframe for packet in av_part.demux(av_part.streams.video[0])