diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 77e41511b92..998e27dcaec 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -3,9 +3,8 @@ from __future__ import annotations import asyncio from collections import deque -from collections.abc import Generator, Iterable +from collections.abc import Iterable import datetime -import itertools from typing import TYPE_CHECKING from aiohttp import web @@ -58,10 +57,7 @@ class Segment: start_time: datetime.datetime = attr.ib() _stream_outputs: Iterable[StreamOutput] = attr.ib() duration: float = attr.ib(default=0) - # Parts are stored in a dict indexed by byterange for easy lookup - # As of Python 3.7, insertion order is preserved, and we insert - # in sequential order, so the Parts are ordered - parts_by_byterange: dict[int, Part] = attr.ib(factory=dict) + parts: list[Part] = attr.ib(factory=list) # Store text of this segment's hls playlist for reuse # Use list[str] for easy appends hls_playlist_template: list[str] = attr.ib(factory=list) @@ -89,13 +85,7 @@ class Segment: @property def data_size(self) -> int: """Return the size of all part data without init in bytes.""" - # We can use the last part to quickly calculate the total data size. - if not self.parts_by_byterange: - return 0 - last_http_range_start, last_part = next( - reversed(self.parts_by_byterange.items()) - ) - return last_http_range_start + len(last_part.data) + return sum(len(part.data) for part in self.parts) @callback def async_add_part( @@ -107,36 +97,14 @@ class Segment: Duration is non zero only for the last part. """ - self.parts_by_byterange[self.data_size] = part + self.parts.append(part) self.duration = duration for output in self._stream_outputs: output.part_put() def get_data(self) -> bytes: """Return reconstructed data for all parts as bytes, without init.""" - return b"".join([part.data for part in self.parts_by_byterange.values()]) - - def get_aggregating_bytes( - self, start_loc: int, end_loc: int | float - ) -> Generator[bytes, None, None]: - """Yield available remaining data until segment is complete or end_loc is reached. - - Begin at start_loc. End at end_loc (exclusive). - Used to help serve a range request on a segment. - """ - pos = start_loc - while (part := self.parts_by_byterange.get(pos)) or not self.complete: - if not part: - yield b"" - continue - pos += len(part.data) - # Check stopping condition and trim output if necessary - if pos >= end_loc: - assert isinstance(end_loc, int) - # Trimming is probably not necessary, but it doesn't hurt - yield part.data[: len(part.data) + end_loc - pos] - return - yield part.data + return b"".join([part.data for part in self.parts]) def _render_hls_template(self, last_stream_id: int, render_parts: bool) -> str: """Render the HLS playlist section for the Segment. @@ -151,15 +119,12 @@ class Segment: # This is a placeholder where the rendered parts will be inserted self.hls_playlist_template.append("{}") if render_parts: - for http_range_start, part in itertools.islice( - self.parts_by_byterange.items(), - self.hls_num_parts_rendered, - None, + for part_num, part in enumerate( + self.parts[self.hls_num_parts_rendered :], self.hls_num_parts_rendered ): self.hls_playlist_parts.append( f"#EXT-X-PART:DURATION={part.duration:.3f},URI=" - f'"./segment/{self.sequence}.m4s",BYTERANGE="{len(part.data)}' - f'@{http_range_start}"{",INDEPENDENT=YES" if part.has_keyframe else ""}' + f'"./segment/{self.sequence}.{part_num}.m4s"{",INDEPENDENT=YES" if part.has_keyframe else ""}' ) if self.complete: # Construct the final playlist_template. The placeholder will share a line with @@ -187,7 +152,7 @@ class Segment: self.hls_playlist_template = ["\n".join(self.hls_playlist_template)] # lstrip discards extra preceding newline in case first render was empty self.hls_playlist_parts = ["\n".join(self.hls_playlist_parts).lstrip()] - self.hls_num_parts_rendered = len(self.parts_by_byterange) + self.hls_num_parts_rendered = len(self.parts) self.hls_playlist_complete = self.complete return self.hls_playlist_template[0] @@ -208,11 +173,13 @@ class Segment: # pylint: disable=undefined-loop-variable if self.complete: # Next part belongs to next segment sequence = self.sequence + 1 - start = 0 + part_num = 0 else: # Next part is in the same segment sequence = self.sequence - start = self.data_size - hint = f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.m4s",BYTERANGE-START={start}' + part_num = len(self.parts) + hint = ( + f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{sequence}.{part_num}.m4s"' + ) return (playlist + "\n" + hint) if playlist else hint @@ -367,7 +334,7 @@ class StreamView(HomeAssistantView): platform = None async def get( - self, request: web.Request, token: str, sequence: str = "" + self, request: web.Request, token: str, sequence: str = "", part_num: str = "" ) -> web.StreamResponse: """Start a GET request.""" hass = request.app["hass"] @@ -383,10 +350,10 @@ class StreamView(HomeAssistantView): # Start worker if not already started stream.start() - return await self.handle(request, stream, sequence) + return await self.handle(request, stream, sequence, part_num) async def handle( - self, request: web.Request, stream: Stream, sequence: str + self, request: web.Request, stream: Stream, sequence: str, part_num: str ) -> web.StreamResponse: """Handle the stream request.""" raise NotImplementedError() diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 9b154e9236b..39ea9a5e8c0 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -34,6 +34,7 @@ def async_setup_hls(hass: HomeAssistant) -> str: hass.http.register_view(HlsSegmentView()) hass.http.register_view(HlsInitView()) hass.http.register_view(HlsMasterPlaylistView()) + hass.http.register_view(HlsPartView()) return "/api/hls/{}/master_playlist.m3u8" @@ -94,7 +95,7 @@ class HlsMasterPlaylistView(StreamView): return "\n".join(lines) + "\n" async def handle( - self, request: web.Request, stream: Stream, sequence: str + self, request: web.Request, stream: Stream, sequence: str, part_num: str ) -> web.Response: """Return m3u8 playlist.""" track = stream.add_provider(HLS_PROVIDER) @@ -220,7 +221,7 @@ class HlsPlaylistView(StreamView): ) async def handle( - self, request: web.Request, stream: Stream, sequence: str + self, request: web.Request, stream: Stream, sequence: str, part_num: str ) -> web.Response: """Return m3u8 playlist.""" track: HlsStreamOutput = cast( @@ -263,7 +264,7 @@ class HlsPlaylistView(StreamView): (last_segment := track.last_segment) and hls_msn == last_segment.sequence and hls_part - >= len(last_segment.parts_by_byterange) + >= len(last_segment.parts) - 1 + track.stream_settings.hls_advance_part_limit ): @@ -273,7 +274,7 @@ class HlsPlaylistView(StreamView): while ( (last_segment := track.last_segment) and hls_msn == last_segment.sequence - and hls_part >= len(last_segment.parts_by_byterange) + and hls_part >= len(last_segment.parts) ): if not await track.part_recv( timeout=track.stream_settings.hls_part_timeout @@ -287,8 +288,8 @@ class HlsPlaylistView(StreamView): # request as one for Part Index 0 of the following Parent Segment. if hls_msn + 1 == last_segment.sequence: if not (previous_segment := track.get_segment(hls_msn)) or ( - hls_part >= len(previous_segment.parts_by_byterange) - and not last_segment.parts_by_byterange + hls_part >= len(previous_segment.parts) + and not last_segment.parts and not await track.part_recv( timeout=track.stream_settings.hls_part_timeout ) @@ -314,7 +315,7 @@ class HlsInitView(StreamView): cors_allowed = True async def handle( - self, request: web.Request, stream: Stream, sequence: str + self, request: web.Request, stream: Stream, sequence: str, part_num: str ) -> web.Response: """Return init.mp4.""" track = stream.add_provider(HLS_PROVIDER) @@ -326,21 +327,17 @@ class HlsInitView(StreamView): ) -class HlsSegmentView(StreamView): +class HlsPartView(StreamView): """Stream view to serve a HLS fmp4 segment.""" - url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s" - name = "api:stream:hls:segment" + url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.{part_num:\d+}.m4s" + name = "api:stream:hls:part" cors_allowed = True async def handle( - self, request: web.Request, stream: Stream, sequence: str - ) -> web.StreamResponse: - """Handle segments, part segments, and hinted segments. - - For part and hinted segments, the start of the requested range must align - with a part boundary. - """ + self, request: web.Request, stream: Stream, sequence: str, part_num: str + ) -> web.Response: + """Handle part.""" track: HlsStreamOutput = cast( HlsStreamOutput, stream.add_provider(HLS_PROVIDER) ) @@ -360,77 +357,58 @@ class HlsSegmentView(StreamView): status=404, headers={"Cache-Control": f"max-age={track.target_duration:.0f}"}, ) - # If the segment is ready or has been hinted, the http_range start should be at most - # equal to the end of the currently available data. - # If the segment is complete, the http_range start should be less than the end of the - # currently available data. - # If these conditions aren't met then we return a 416. - # http_range_start can be None, so use a copy that uses 0 instead of None - if (http_start := request.http_range.start or 0) > segment.data_size or ( - segment.complete and http_start >= segment.data_size - ): + # If the part is ready or has been hinted, + if int(part_num) == len(segment.parts): + await track.part_recv(timeout=track.stream_settings.hls_part_timeout) + if int(part_num) >= len(segment.parts): return web.HTTPRequestRangeNotSatisfiable( headers={ "Cache-Control": f"max-age={track.target_duration:.0f}", - "Content-Range": f"bytes */{segment.data_size}", } ) - headers = { - "Content-Type": "video/iso.segment", - "Cache-Control": f"max-age={6*track.target_duration:.0f}", - } - # For most cases we have a 206 partial content response. - status = 206 - # For the 206 responses we need to set a Content-Range header - # See https://datatracker.ietf.org/doc/html/rfc8673#section-2 - if request.http_range.stop is None: - if request.http_range.start is None: - status = 200 - if segment.complete: - # This is a request for a full segment which is already complete - # We should return a standard 200 response. - return web.Response( - body=segment.get_data(), headers=headers, status=status - ) - # Otherwise we still return a 200 response, but it is aggregating - http_stop = float("inf") - else: - # See https://datatracker.ietf.org/doc/html/rfc7233#section-2.1 - headers[ - "Content-Range" - ] = f"bytes {http_start}-{(http_stop:=segment.data_size)-1}/*" - else: # The remaining cases are all 206 responses - if segment.complete: - # If the segment is complete we have total size - headers["Content-Range"] = ( - f"bytes {http_start}-" - + str( - (http_stop := min(request.http_range.stop, segment.data_size)) - - 1 - ) - + f"/{segment.data_size}" - ) - else: - # If we don't have the total size we use a * - headers[ - "Content-Range" - ] = f"bytes {http_start}-{(http_stop:=request.http_range.stop)-1}/*" - # Set up streaming response that we can write to as data becomes available - response = web.StreamResponse(headers=headers, status=status) - # Waiting until we write to prepare *might* give clients more accurate TTFB - # and ABR measurements, but it is probably not very useful for us since we - # only have one rendition anyway. Just prepare here for now. - await response.prepare(request) - try: - for bytes_to_write in segment.get_aggregating_bytes( - start_loc=http_start, end_loc=http_stop - ): - if bytes_to_write: - await response.write(bytes_to_write) - elif not await track.part_recv( - timeout=track.stream_settings.hls_part_timeout - ): - break - except ConnectionResetError: - _LOGGER.warning("Connection reset while serving HLS partial segment") - return response + return web.Response( + body=segment.parts[int(part_num)].data, + headers={ + "Content-Type": "video/iso.segment", + "Cache-Control": f"max-age={6*track.target_duration:.0f}", + }, + ) + + +class HlsSegmentView(StreamView): + """Stream view to serve a HLS fmp4 segment.""" + + url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s" + name = "api:stream:hls:segment" + cors_allowed = True + + async def handle( + self, request: web.Request, stream: Stream, sequence: str, part_num: str + ) -> web.StreamResponse: + """Handle segments.""" + track: HlsStreamOutput = cast( + HlsStreamOutput, stream.add_provider(HLS_PROVIDER) + ) + track.idle_timer.awake() + # Ensure that we have a segment. If the request is from a hint for part 0 + # of a segment, there is a small chance it may have arrived before the + # segment has been put. If this happens, wait for one part and retry. + if not ( + (segment := track.get_segment(int(sequence))) + or ( + await track.part_recv(timeout=track.stream_settings.hls_part_timeout) + and (segment := track.get_segment(int(sequence))) + ) + ): + return web.Response( + body=None, + status=404, + headers={"Cache-Control": f"max-age={track.target_duration:.0f}"}, + ) + return web.Response( + body=segment.get_data(), + headers={ + "Content-Type": "video/iso.segment", + "Cache-Control": f"max-age={6*track.target_duration:.0f}", + }, + ) diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 4b0cb0322ce..da040f6646a 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -345,13 +345,13 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync): # Fetch the actual segments with a fake byte payload for segment in hls.get_segments(): segment.init = INIT_BYTES - segment.parts_by_byterange = { - 0: Part( + segment.parts = [ + Part( duration=SEGMENT_DURATION, has_keyframe=True, data=FAKE_PAYLOAD, ) - } + ] # The segment that fell off the buffer is not accessible with patch.object(hls.stream_settings, "hls_part_timeout", 0.1): diff --git a/tests/components/stream/test_ll_hls.py b/tests/components/stream/test_ll_hls.py index 8e512e0723e..ab1c01adce8 100644 --- a/tests/components/stream/test_ll_hls.py +++ b/tests/components/stream/test_ll_hls.py @@ -59,9 +59,7 @@ def create_segment(sequence): def complete_segment(segment): """Completes a segment by setting its duration.""" - segment.duration = sum( - part.duration for part in segment.parts_by_byterange.values() - ) + segment.duration = sum(part.duration for part in segment.parts) def create_parts(source): @@ -90,9 +88,8 @@ def make_segment_with_parts( """Create a playlist response for a segment including part segments.""" response = [] for i in range(num_parts): - length, start = http_range_from_part(i) response.append( - f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.m4s",BYTERANGE="{length}@{start}"{",INDEPENDENT=YES" if i%independent_period==0 else ""}' + f'#EXT-X-PART:DURATION={TEST_PART_DURATION:.3f},URI="./segment/{segment}.{i}.m4s"{",INDEPENDENT=YES" if i%independent_period==0 else ""}' ) if discontinuity: response.append("#EXT-X-DISCONTINUITY") @@ -110,8 +107,7 @@ def make_segment_with_parts( def make_hint(segment, part): """Create a playlist response for the preload hint.""" - _, start = http_range_from_part(part) - return f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{segment}.m4s",BYTERANGE-START={start}' + return f'#EXT-X-PRELOAD-HINT:TYPE=PART,URI="./segment/{segment}.{part}.m4s"' async def test_ll_hls_stream(hass, hls_stream, stream_worker_sync): @@ -252,9 +248,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync): assert await resp.text() == make_playlist( sequence=0, segments=[ - make_segment_with_parts( - i, len(segment.parts_by_byterange), PART_INDEPENDENT_PERIOD - ) + make_segment_with_parts(i, len(segment.parts), PART_INDEPENDENT_PERIOD) for i in range(2) ], hint=make_hint(2, 0), @@ -275,9 +269,7 @@ async def test_ll_hls_playlist_view(hass, hls_stream, stream_worker_sync): assert await resp.text() == make_playlist( sequence=0, segments=[ - make_segment_with_parts( - i, len(segment.parts_by_byterange), PART_INDEPENDENT_PERIOD - ) + make_segment_with_parts(i, len(segment.parts), PART_INDEPENDENT_PERIOD) for i in range(3) ], hint=make_hint(3, 0), @@ -459,13 +451,13 @@ async def test_ll_hls_playlist_rollover_part( *( [ hls_client.get( - f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)-1}" + f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)-1}" ), hls_client.get( - f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)}" + f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)}" ), hls_client.get( - f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts_by_byterange)+1}" + f"/playlist.m3u8?_HLS_msn=1&_HLS_part={len(segment.parts)+1}" ), hls_client.get("/playlist.m3u8?_HLS_msn=2&_HLS_part=0"), ] @@ -600,85 +592,32 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync) segment.async_add_part(remaining_parts.pop(0), 0) # Make requests for all the existing part segments - # These should succeed with a status of 206 + # These should succeed requests = asyncio.gather( *( - hls_client.get( - "/segment/1.m4s", - headers={ - "Range": f"bytes={http_range_from_part(part)[1]}-" - + str( - http_range_from_part(part)[0] - + http_range_from_part(part)[1] - - 1 - ) - }, - ) + hls_client.get(f"/segment/1.{part}.m4s") for part in range(num_completed_parts) ) ) responses = await requests - assert all(response.status == 206 for response in responses) + assert all(response.status == 200 for response in responses) assert all( - responses[part].headers["Content-Range"] - == f"bytes {http_range_from_part(part)[1]}-" - + str(http_range_from_part(part)[0] + http_range_from_part(part)[1] - 1) - + "/*" - for part in range(num_completed_parts) + [ + await responses[i].read() == segment.parts[i].data + for i in range(len(responses)) + ] ) - parts = list(segment.parts_by_byterange.values()) - assert all( - [await responses[i].read() == parts[i].data for i in range(len(responses))] - ) - - # Make some non standard range requests. - # Request past end of previous closed segment - # Request should succeed but length will be limited to the segment length - response = await hls_client.get( - "/segment/0.m4s", - headers={"Range": f"bytes=0-{hls.get_segment(0).data_size+1}"}, - ) - assert response.status == 206 - assert ( - response.headers["Content-Range"] - == f"bytes 0-{hls.get_segment(0).data_size-1}/{hls.get_segment(0).data_size}" - ) - assert (await response.read()) == hls.get_segment(0).get_data() - - # Request with start range past end of current segment - # Since this is beyond the data we have (the largest starting position will be - # from a hinted request, and even that will have a starting position at - # segment.data_size), we expect a 416. - response = await hls_client.get( - "/segment/1.m4s", - headers={"Range": f"bytes={segment.data_size+1}-{VERY_LARGE_LAST_BYTE_POS}"}, - ) - assert response.status == 416 # Request for next segment which has not yet been hinted (we will only hint # for this segment after segment 1 is complete). # This should fail, but it will hold for one more part_put before failing. hls_sync.reset_request_pool(1) - request = asyncio.create_task( - hls_client.get( - "/segment/2.m4s", headers={"Range": f"bytes=0-{VERY_LARGE_LAST_BYTE_POS}"} - ) - ) + request = asyncio.create_task(hls_client.get("/segment/2.0.m4s")) await hls_sync.wait_for_handler() hls.part_put() response = await request assert response.status == 404 - # Make valid request for the current hint. This should succeed, but since - # it is open ended, it won't finish until the segment is complete. - hls_sync.reset_request_pool(1) - request_start = segment.data_size - request = asyncio.create_task( - hls_client.get( - "/segment/1.m4s", - headers={"Range": f"bytes={request_start}-{VERY_LARGE_LAST_BYTE_POS}"}, - ) - ) # Put the remaining parts and complete the segment while remaining_parts: await hls_sync.wait_for_handler() @@ -686,26 +625,11 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync) segment.async_add_part(remaining_parts.pop(0), 0) hls.part_put() complete_segment(segment) - # Check the response - response = await request - assert response.status == 206 - assert ( - response.headers["Content-Range"] - == f"bytes {request_start}-{VERY_LARGE_LAST_BYTE_POS}/*" - ) - assert await response.read() == SEQUENCE_BYTES[request_start:] # Now the hint should have moved to segment 2 # The request for segment 2 which failed before should work now - # Also make an equivalent request with no Range parameters that - # will return the same content but with different headers - hls_sync.reset_request_pool(2) - requests = asyncio.gather( - hls_client.get( - "/segment/2.m4s", headers={"Range": f"bytes=0-{VERY_LARGE_LAST_BYTE_POS}"} - ), - hls_client.get("/segment/2.m4s"), - ) + hls_sync.reset_request_pool(1) + request = asyncio.create_task(hls_client.get("/segment/2.0.m4s")) # Put an entire segment and its parts. segment = create_segment(sequence=2) hls.put(segment) @@ -716,16 +640,11 @@ async def test_get_part_segments(hass, hls_stream, stream_worker_sync, hls_sync) hls.part_put() complete_segment(segment) # Check the response - responses = await requests - assert responses[0].status == 206 + response = await request + assert response.status == 200 assert ( - responses[0].headers["Content-Range"] == f"bytes 0-{VERY_LARGE_LAST_BYTE_POS}/*" - ) - assert responses[1].status == 200 - assert "Content-Range" not in responses[1].headers - assert ( - await response.read() == ALT_SEQUENCE_BYTES[: hls.get_segment(2).data_size] - for response in responses + await response.read() + == ALT_SEQUENCE_BYTES[: len(hls.get_segment(2).parts[0].data)] ) stream_worker_sync.resume() diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index b8521205920..ba35b5a4b72 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -126,14 +126,14 @@ def add_parts_to_segment(segment, source): """Add relevant part data to segment for testing recorder.""" moof_locs = list(find_box(source.getbuffer(), b"moof")) + [len(source.getbuffer())] segment.init = source.getbuffer()[: moof_locs[0]].tobytes() - segment.parts_by_byterange = { - moof_locs[i]: Part( + segment.parts = [ + Part( duration=None, has_keyframe=None, data=source.getbuffer()[moof_locs[i] : moof_locs[i + 1]], ) for i in range(len(moof_locs) - 1) - } + ] async def test_recorder_save(tmpdir): diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 16412b28468..e353f950aea 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -699,7 +699,7 @@ async def test_durations(hass, record_worker_sync): # check that the Part duration metadata matches the durations in the media running_metadata_duration = 0 for segment in complete_segments: - for part in segment.parts_by_byterange.values(): + for part in segment.parts: av_part = av.open(io.BytesIO(segment.init + part.data)) running_metadata_duration += part.duration # av_part.duration will just return the largest dts in av_part. @@ -713,7 +713,7 @@ async def test_durations(hass, record_worker_sync): # check that the Part durations are consistent with the Segment durations for segment in complete_segments: assert math.isclose( - sum(part.duration for part in segment.parts_by_byterange.values()), + sum(part.duration for part in segment.parts), segment.duration, abs_tol=1e-6, ) @@ -751,7 +751,7 @@ async def test_has_keyframe(hass, record_worker_sync): # check that the Part has_keyframe metadata matches the keyframes in the media for segment in complete_segments: - for part in segment.parts_by_byterange.values(): + for part in segment.parts: av_part = av.open(io.BytesIO(segment.init + part.data)) media_has_keyframe = any( packet.is_keyframe for packet in av_part.demux(av_part.streams.video[0])