diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 20ff8210996..62d13321f91 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -21,15 +21,19 @@ NUM_PLAYLIST_SEGMENTS = 3 # Number of segments to use in HLS playlist MAX_SEGMENTS = 4 # Max number of segments to keep around TARGET_SEGMENT_DURATION = 2.0 # Each segment is about this many seconds SEGMENT_DURATION_ADJUSTER = 0.1 # Used to avoid missing keyframe boundaries -MIN_SEGMENT_DURATION = ( - TARGET_SEGMENT_DURATION - SEGMENT_DURATION_ADJUSTER -) # Each segment is at least this many seconds +# Each segment is at least this many seconds +MIN_SEGMENT_DURATION = TARGET_SEGMENT_DURATION - SEGMENT_DURATION_ADJUSTER + +# Number of target durations to start before the end of the playlist. +# 1.5 should put us in the middle of the second to last segment even with +# variable keyframe intervals. +EXT_X_START = 1.5 PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable MAX_MISSING_DTS = 6 # Number of packets missing DTS to allow -STREAM_TIMEOUT = 30 # Timeout for reading stream +SOURCE_TIMEOUT = 30 # Timeout for reading stream source STREAM_RESTART_INCREMENT = 10 # Increase wait_timeout by this amount each retry STREAM_RESTART_RESET_TIME = 300 # Reset wait_timeout after this many seconds diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 76fae3cdacf..fcbc59ecdf3 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio from collections import deque +import datetime from typing import Callable from aiohttp import web @@ -30,6 +31,7 @@ class Segment: duration: float = attr.ib() # For detecting discontinuities across stream restarts stream_id: int = attr.ib(default=0) + start_time: datetime.datetime = attr.ib(factory=datetime.datetime.utcnow) class IdleTimer: @@ -83,7 +85,6 @@ class StreamOutput: """Initialize a stream output.""" self._hass = hass self._idle_timer = idle_timer - self._cursor: int | None = None self._event = asyncio.Event() self._segments: deque[Segment] = deque(maxlen=deque_maxlen) @@ -109,6 +110,13 @@ class StreamOutput: """Return current sequence from segments.""" return [s.sequence for s in self._segments] + @property + def last_segment(self) -> Segment | None: + """Return the last segment without iterating.""" + if self._segments: + return self._segments[-1] + return None + @property def target_duration(self) -> int: """Return the max duration of any given segment in seconds.""" @@ -120,8 +128,6 @@ class StreamOutput: def get_segment(self, sequence: int) -> Segment | None: """Retrieve a specific segment.""" - self._idle_timer.awake() - for segment in self._segments: if segment.sequence == sequence: return segment @@ -129,20 +135,13 @@ class StreamOutput: def get_segments(self) -> deque[Segment]: """Retrieve all segments.""" - self._idle_timer.awake() return self._segments - async def recv(self) -> Segment | None: + async def recv(self) -> bool: """Wait for and retrieve the latest segment.""" - if self._cursor is None or self._cursor <= self.last_sequence: - await self._event.wait() - - if not self._segments: - return None - - segment = self.get_segments()[-1] - self._cursor = segment.sequence - return segment + self._idle_timer.awake() + await self._event.wait() + return self.last_segment is not None def put(self, segment: Segment) -> None: """Store output.""" diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 7b5185da6bf..be3598edb36 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -4,6 +4,7 @@ from aiohttp import web from homeassistant.core import callback from .const import ( + EXT_X_START, FORMAT_CONTENT_TYPE, HLS_PROVIDER, MAX_SEGMENTS, @@ -70,7 +71,7 @@ class HlsPlaylistView(StreamView): def render_preamble(track): """Render preamble.""" return [ - "#EXT-X-VERSION:7", + "#EXT-X-VERSION:6", f"#EXT-X-TARGETDURATION:{track.target_duration}", '#EXT-X-MAP:URI="init.mp4"', ] @@ -83,15 +84,31 @@ class HlsPlaylistView(StreamView): if not segments: return [] + first_segment = segments[0] playlist = [ - f"#EXT-X-MEDIA-SEQUENCE:{segments[0].sequence}", - f"#EXT-X-DISCONTINUITY-SEQUENCE:{segments[0].stream_id}", + f"#EXT-X-MEDIA-SEQUENCE:{first_segment.sequence}", + f"#EXT-X-DISCONTINUITY-SEQUENCE:{first_segment.stream_id}", + "#EXT-X-PROGRAM-DATE-TIME:" + + first_segment.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + + "Z", + # Since our window doesn't have many segments, we don't want to start + # at the beginning or we risk a behind live window exception in exoplayer. + # EXT-X-START is not supposed to be within 3 target durations of the end, + # but this seems ok + f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START * track.target_duration:.3f},PRECISE=YES", ] - last_stream_id = segments[0].stream_id + last_stream_id = first_segment.stream_id for segment in segments: if last_stream_id != segment.stream_id: - playlist.append("#EXT-X-DISCONTINUITY") + playlist.extend( + [ + "#EXT-X-DISCONTINUITY", + "#EXT-X-PROGRAM-DATE-TIME:" + + segment.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + + "Z", + ] + ) playlist.extend( [ f"#EXTINF:{float(segment.duration):.04f},", @@ -115,7 +132,11 @@ class HlsPlaylistView(StreamView): if not track.sequences and not await track.recv(): return web.HTTPNotFound() headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]} - return web.Response(body=self.render(track).encode("utf-8"), headers=headers) + response = web.Response( + body=self.render(track).encode("utf-8"), headers=headers + ) + response.enable_compression(web.ContentCoding.gzip) + return response class HlsInitView(StreamView): @@ -128,8 +149,7 @@ class HlsInitView(StreamView): async def handle(self, request, stream, sequence): """Return init.mp4.""" track = stream.add_provider(HLS_PROVIDER) - segments = track.get_segments() - if not segments: + if not (segments := track.get_segments()): return web.HTTPNotFound() headers = {"Content-Type": "video/mp4"} return web.Response(body=segments[0].init, headers=headers) @@ -145,8 +165,7 @@ class HlsSegmentView(StreamView): async def handle(self, request, stream, sequence): """Return fmp4 segment.""" track = stream.add_provider(HLS_PROVIDER) - segment = track.get_segment(int(sequence)) - if not segment: + if not (segment := track.get_segment(int(sequence))): return web.HTTPNotFound() headers = {"Content-Type": "video/iso.segment"} return web.Response( diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 6fe339c5dea..c606d1ad0dc 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -16,7 +16,7 @@ from .const import ( MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO, SEGMENT_CONTAINER_FORMAT, - STREAM_TIMEOUT, + SOURCE_TIMEOUT, ) from .core import Segment, StreamOutput from .fmp4utils import get_init_and_moof_data @@ -149,7 +149,7 @@ def stream_worker(source, options, segment_buffer, quit_event): # noqa: C901 """Handle consuming streams.""" try: - container = av.open(source, options=options, timeout=STREAM_TIMEOUT) + container = av.open(source, options=options, timeout=SOURCE_TIMEOUT) except av.AVError: _LOGGER.error("Error opening stream %s", redact_credentials(str(source))) return diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 3e4b81bcc25..a31c686dcaf 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -1,5 +1,5 @@ """The tests for hls streams.""" -from datetime import timedelta +from datetime import datetime, timedelta from unittest.mock import patch from urllib.parse import urlparse @@ -23,9 +23,10 @@ from tests.components.stream.common import generate_h264_video STREAM_SOURCE = "some-stream-source" INIT_BYTES = b"init" MOOF_BYTES = b"some-bytes" -DURATION = 10 +SEGMENT_DURATION = 10 TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever +FAKE_TIME = datetime.utcnow() class HlsClient: @@ -61,7 +62,14 @@ def make_segment(segment, discontinuity=False): """Create a playlist response for a segment.""" response = [] if discontinuity: - response.append("#EXT-X-DISCONTINUITY") + response.extend( + [ + "#EXT-X-DISCONTINUITY", + "#EXT-X-PROGRAM-DATE-TIME:" + + FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + + "Z", + ] + ) response.extend(["#EXTINF:10.0000,", f"./segment/{segment}.m4s"]), return "\n".join(response) @@ -70,11 +78,15 @@ def make_playlist(sequence, discontinuity_sequence=0, segments=[]): """Create a an hls playlist response for tests to assert on.""" response = [ "#EXTM3U", - "#EXT-X-VERSION:7", + "#EXT-X-VERSION:6", "#EXT-X-TARGETDURATION:10", '#EXT-X-MAP:URI="init.mp4"', f"#EXT-X-MEDIA-SEQUENCE:{sequence}", f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}", + "#EXT-X-PROGRAM-DATE-TIME:" + + FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + + "Z", + f"#EXT-X-START:TIME-OFFSET=-{1.5*SEGMENT_DURATION:.3f},PRECISE=YES", ] response.extend(segments) response.append("") @@ -252,7 +264,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync): stream_worker_sync.pause() hls = stream.add_provider(HLS_PROVIDER) - hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION)) + hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, SEGMENT_DURATION, start_time=FAKE_TIME)) await hass.async_block_till_done() hls_client = await hls_stream(stream) @@ -261,7 +273,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync): assert resp.status == 200 assert await resp.text() == make_playlist(sequence=1, segments=[make_segment(1)]) - hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION)) + hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, SEGMENT_DURATION, start_time=FAKE_TIME)) await hass.async_block_till_done() resp = await hls_client.get("/playlist.m3u8") assert resp.status == 200 @@ -285,7 +297,15 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync): # Produce enough segments to overfill the output buffer by one for sequence in range(1, MAX_SEGMENTS + 2): - hls.put(Segment(sequence, INIT_BYTES, MOOF_BYTES, DURATION)) + hls.put( + Segment( + sequence, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + start_time=FAKE_TIME, + ) + ) await hass.async_block_till_done() resp = await hls_client.get("/playlist.m3u8") @@ -322,9 +342,36 @@ async def test_hls_playlist_view_discontinuity(hass, hls_stream, stream_worker_s stream_worker_sync.pause() hls = stream.add_provider(HLS_PROVIDER) - hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) - hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) - hls.put(Segment(3, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=1)) + hls.put( + Segment( + 1, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + stream_id=0, + start_time=FAKE_TIME, + ) + ) + hls.put( + Segment( + 2, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + stream_id=0, + start_time=FAKE_TIME, + ) + ) + hls.put( + Segment( + 3, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + stream_id=1, + start_time=FAKE_TIME, + ) + ) await hass.async_block_till_done() hls_client = await hls_stream(stream) @@ -354,11 +401,29 @@ async def test_hls_max_segments_discontinuity(hass, hls_stream, stream_worker_sy hls_client = await hls_stream(stream) - hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) + hls.put( + Segment( + 1, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + stream_id=0, + start_time=FAKE_TIME, + ) + ) # Produce enough segments to overfill the output buffer by one for sequence in range(1, MAX_SEGMENTS + 2): - hls.put(Segment(sequence, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=1)) + hls.put( + Segment( + sequence, + INIT_BYTES, + MOOF_BYTES, + SEGMENT_DURATION, + stream_id=1, + start_time=FAKE_TIME, + ) + ) await hass.async_block_till_done() resp = await hls_client.get("/playlist.m3u8") diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 216e02a95b9..d45dd0cbca7 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -256,8 +256,8 @@ async def test_record_stream_audio( recorder = stream.add_provider(RECORDER_PROVIDER) while True: - segment = await recorder.recv() - if not segment: + await recorder.recv() + if not (segment := recorder.last_segment): break last_segment = segment stream_worker_sync.resume()