Update HLS playlist in stream (#51191)

* Enable gzip encoding for playlist responses
* Add EXT-X-PROGRAM-DATE-TIME to playlist
* Add EXT-X-START to playlist
* Change EXT-X-VERSION from 7 to 6
* Move idle timer call to recv
* Refactor recv to remove cursor and return bool
* Rename STREAM_TIMEOUT to SOURCE_TIMEOUT
This commit is contained in:
uvjustin 2021-05-30 11:41:23 +08:00 committed by GitHub
parent 2077efb207
commit 3ca7eb9440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 131 additions and 44 deletions

View File

@ -21,15 +21,19 @@ NUM_PLAYLIST_SEGMENTS = 3 # Number of segments to use in HLS playlist
MAX_SEGMENTS = 4 # Max number of segments to keep around MAX_SEGMENTS = 4 # Max number of segments to keep around
TARGET_SEGMENT_DURATION = 2.0 # Each segment is about this many seconds TARGET_SEGMENT_DURATION = 2.0 # Each segment is about this many seconds
SEGMENT_DURATION_ADJUSTER = 0.1 # Used to avoid missing keyframe boundaries SEGMENT_DURATION_ADJUSTER = 0.1 # Used to avoid missing keyframe boundaries
MIN_SEGMENT_DURATION = ( # Each segment is at least this many seconds
TARGET_SEGMENT_DURATION - SEGMENT_DURATION_ADJUSTER MIN_SEGMENT_DURATION = TARGET_SEGMENT_DURATION - SEGMENT_DURATION_ADJUSTER
) # Each segment is at least this many seconds
# Number of target durations to start before the end of the playlist.
# 1.5 should put us in the middle of the second to last segment even with
# variable keyframe intervals.
EXT_X_START = 1.5
PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio
MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable
MAX_MISSING_DTS = 6 # Number of packets missing DTS to allow MAX_MISSING_DTS = 6 # Number of packets missing DTS to allow
STREAM_TIMEOUT = 30 # Timeout for reading stream SOURCE_TIMEOUT = 30 # Timeout for reading stream source
STREAM_RESTART_INCREMENT = 10 # Increase wait_timeout by this amount each retry STREAM_RESTART_INCREMENT = 10 # Increase wait_timeout by this amount each retry
STREAM_RESTART_RESET_TIME = 300 # Reset wait_timeout after this many seconds STREAM_RESTART_RESET_TIME = 300 # Reset wait_timeout after this many seconds

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio import asyncio
from collections import deque from collections import deque
import datetime
from typing import Callable from typing import Callable
from aiohttp import web from aiohttp import web
@ -30,6 +31,7 @@ class Segment:
duration: float = attr.ib() duration: float = attr.ib()
# For detecting discontinuities across stream restarts # For detecting discontinuities across stream restarts
stream_id: int = attr.ib(default=0) stream_id: int = attr.ib(default=0)
start_time: datetime.datetime = attr.ib(factory=datetime.datetime.utcnow)
class IdleTimer: class IdleTimer:
@ -83,7 +85,6 @@ class StreamOutput:
"""Initialize a stream output.""" """Initialize a stream output."""
self._hass = hass self._hass = hass
self._idle_timer = idle_timer self._idle_timer = idle_timer
self._cursor: int | None = None
self._event = asyncio.Event() self._event = asyncio.Event()
self._segments: deque[Segment] = deque(maxlen=deque_maxlen) self._segments: deque[Segment] = deque(maxlen=deque_maxlen)
@ -109,6 +110,13 @@ class StreamOutput:
"""Return current sequence from segments.""" """Return current sequence from segments."""
return [s.sequence for s in self._segments] return [s.sequence for s in self._segments]
@property
def last_segment(self) -> Segment | None:
"""Return the last segment without iterating."""
if self._segments:
return self._segments[-1]
return None
@property @property
def target_duration(self) -> int: def target_duration(self) -> int:
"""Return the max duration of any given segment in seconds.""" """Return the max duration of any given segment in seconds."""
@ -120,8 +128,6 @@ class StreamOutput:
def get_segment(self, sequence: int) -> Segment | None: def get_segment(self, sequence: int) -> Segment | None:
"""Retrieve a specific segment.""" """Retrieve a specific segment."""
self._idle_timer.awake()
for segment in self._segments: for segment in self._segments:
if segment.sequence == sequence: if segment.sequence == sequence:
return segment return segment
@ -129,20 +135,13 @@ class StreamOutput:
def get_segments(self) -> deque[Segment]: def get_segments(self) -> deque[Segment]:
"""Retrieve all segments.""" """Retrieve all segments."""
self._idle_timer.awake()
return self._segments return self._segments
async def recv(self) -> Segment | None: async def recv(self) -> bool:
"""Wait for and retrieve the latest segment.""" """Wait for and retrieve the latest segment."""
if self._cursor is None or self._cursor <= self.last_sequence: self._idle_timer.awake()
await self._event.wait() await self._event.wait()
return self.last_segment is not None
if not self._segments:
return None
segment = self.get_segments()[-1]
self._cursor = segment.sequence
return segment
def put(self, segment: Segment) -> None: def put(self, segment: Segment) -> None:
"""Store output.""" """Store output."""

View File

@ -4,6 +4,7 @@ from aiohttp import web
from homeassistant.core import callback from homeassistant.core import callback
from .const import ( from .const import (
EXT_X_START,
FORMAT_CONTENT_TYPE, FORMAT_CONTENT_TYPE,
HLS_PROVIDER, HLS_PROVIDER,
MAX_SEGMENTS, MAX_SEGMENTS,
@ -70,7 +71,7 @@ class HlsPlaylistView(StreamView):
def render_preamble(track): def render_preamble(track):
"""Render preamble.""" """Render preamble."""
return [ return [
"#EXT-X-VERSION:7", "#EXT-X-VERSION:6",
f"#EXT-X-TARGETDURATION:{track.target_duration}", f"#EXT-X-TARGETDURATION:{track.target_duration}",
'#EXT-X-MAP:URI="init.mp4"', '#EXT-X-MAP:URI="init.mp4"',
] ]
@ -83,15 +84,31 @@ class HlsPlaylistView(StreamView):
if not segments: if not segments:
return [] return []
first_segment = segments[0]
playlist = [ playlist = [
f"#EXT-X-MEDIA-SEQUENCE:{segments[0].sequence}", f"#EXT-X-MEDIA-SEQUENCE:{first_segment.sequence}",
f"#EXT-X-DISCONTINUITY-SEQUENCE:{segments[0].stream_id}", f"#EXT-X-DISCONTINUITY-SEQUENCE:{first_segment.stream_id}",
"#EXT-X-PROGRAM-DATE-TIME:"
+ first_segment.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
# Since our window doesn't have many segments, we don't want to start
# at the beginning or we risk a behind live window exception in exoplayer.
# EXT-X-START is not supposed to be within 3 target durations of the end,
# but this seems ok
f"#EXT-X-START:TIME-OFFSET=-{EXT_X_START * track.target_duration:.3f},PRECISE=YES",
] ]
last_stream_id = segments[0].stream_id last_stream_id = first_segment.stream_id
for segment in segments: for segment in segments:
if last_stream_id != segment.stream_id: if last_stream_id != segment.stream_id:
playlist.append("#EXT-X-DISCONTINUITY") playlist.extend(
[
"#EXT-X-DISCONTINUITY",
"#EXT-X-PROGRAM-DATE-TIME:"
+ segment.start_time.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
]
)
playlist.extend( playlist.extend(
[ [
f"#EXTINF:{float(segment.duration):.04f},", f"#EXTINF:{float(segment.duration):.04f},",
@ -115,7 +132,11 @@ class HlsPlaylistView(StreamView):
if not track.sequences and not await track.recv(): if not track.sequences and not await track.recv():
return web.HTTPNotFound() return web.HTTPNotFound()
headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]} headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]}
return web.Response(body=self.render(track).encode("utf-8"), headers=headers) response = web.Response(
body=self.render(track).encode("utf-8"), headers=headers
)
response.enable_compression(web.ContentCoding.gzip)
return response
class HlsInitView(StreamView): class HlsInitView(StreamView):
@ -128,8 +149,7 @@ class HlsInitView(StreamView):
async def handle(self, request, stream, sequence): async def handle(self, request, stream, sequence):
"""Return init.mp4.""" """Return init.mp4."""
track = stream.add_provider(HLS_PROVIDER) track = stream.add_provider(HLS_PROVIDER)
segments = track.get_segments() if not (segments := track.get_segments()):
if not segments:
return web.HTTPNotFound() return web.HTTPNotFound()
headers = {"Content-Type": "video/mp4"} headers = {"Content-Type": "video/mp4"}
return web.Response(body=segments[0].init, headers=headers) return web.Response(body=segments[0].init, headers=headers)
@ -145,8 +165,7 @@ class HlsSegmentView(StreamView):
async def handle(self, request, stream, sequence): async def handle(self, request, stream, sequence):
"""Return fmp4 segment.""" """Return fmp4 segment."""
track = stream.add_provider(HLS_PROVIDER) track = stream.add_provider(HLS_PROVIDER)
segment = track.get_segment(int(sequence)) if not (segment := track.get_segment(int(sequence))):
if not segment:
return web.HTTPNotFound() return web.HTTPNotFound()
headers = {"Content-Type": "video/iso.segment"} headers = {"Content-Type": "video/iso.segment"}
return web.Response( return web.Response(

View File

@ -16,7 +16,7 @@ from .const import (
MIN_SEGMENT_DURATION, MIN_SEGMENT_DURATION,
PACKETS_TO_WAIT_FOR_AUDIO, PACKETS_TO_WAIT_FOR_AUDIO,
SEGMENT_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT,
STREAM_TIMEOUT, SOURCE_TIMEOUT,
) )
from .core import Segment, StreamOutput from .core import Segment, StreamOutput
from .fmp4utils import get_init_and_moof_data from .fmp4utils import get_init_and_moof_data
@ -149,7 +149,7 @@ def stream_worker(source, options, segment_buffer, quit_event): # noqa: C901
"""Handle consuming streams.""" """Handle consuming streams."""
try: try:
container = av.open(source, options=options, timeout=STREAM_TIMEOUT) container = av.open(source, options=options, timeout=SOURCE_TIMEOUT)
except av.AVError: except av.AVError:
_LOGGER.error("Error opening stream %s", redact_credentials(str(source))) _LOGGER.error("Error opening stream %s", redact_credentials(str(source)))
return return

View File

@ -1,5 +1,5 @@
"""The tests for hls streams.""" """The tests for hls streams."""
from datetime import timedelta from datetime import datetime, timedelta
from unittest.mock import patch from unittest.mock import patch
from urllib.parse import urlparse from urllib.parse import urlparse
@ -23,9 +23,10 @@ from tests.components.stream.common import generate_h264_video
STREAM_SOURCE = "some-stream-source" STREAM_SOURCE = "some-stream-source"
INIT_BYTES = b"init" INIT_BYTES = b"init"
MOOF_BYTES = b"some-bytes" MOOF_BYTES = b"some-bytes"
DURATION = 10 SEGMENT_DURATION = 10
TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout TEST_TIMEOUT = 5.0 # Lower than 9s home assistant timeout
MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever
FAKE_TIME = datetime.utcnow()
class HlsClient: class HlsClient:
@ -61,7 +62,14 @@ def make_segment(segment, discontinuity=False):
"""Create a playlist response for a segment.""" """Create a playlist response for a segment."""
response = [] response = []
if discontinuity: if discontinuity:
response.append("#EXT-X-DISCONTINUITY") response.extend(
[
"#EXT-X-DISCONTINUITY",
"#EXT-X-PROGRAM-DATE-TIME:"
+ FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
]
)
response.extend(["#EXTINF:10.0000,", f"./segment/{segment}.m4s"]), response.extend(["#EXTINF:10.0000,", f"./segment/{segment}.m4s"]),
return "\n".join(response) return "\n".join(response)
@ -70,11 +78,15 @@ def make_playlist(sequence, discontinuity_sequence=0, segments=[]):
"""Create a an hls playlist response for tests to assert on.""" """Create a an hls playlist response for tests to assert on."""
response = [ response = [
"#EXTM3U", "#EXTM3U",
"#EXT-X-VERSION:7", "#EXT-X-VERSION:6",
"#EXT-X-TARGETDURATION:10", "#EXT-X-TARGETDURATION:10",
'#EXT-X-MAP:URI="init.mp4"', '#EXT-X-MAP:URI="init.mp4"',
f"#EXT-X-MEDIA-SEQUENCE:{sequence}", f"#EXT-X-MEDIA-SEQUENCE:{sequence}",
f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}", f"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuity_sequence}",
"#EXT-X-PROGRAM-DATE-TIME:"
+ FAKE_TIME.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
f"#EXT-X-START:TIME-OFFSET=-{1.5*SEGMENT_DURATION:.3f},PRECISE=YES",
] ]
response.extend(segments) response.extend(segments)
response.append("") response.append("")
@ -252,7 +264,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
stream_worker_sync.pause() stream_worker_sync.pause()
hls = stream.add_provider(HLS_PROVIDER) hls = stream.add_provider(HLS_PROVIDER)
hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION)) hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, SEGMENT_DURATION, start_time=FAKE_TIME))
await hass.async_block_till_done() await hass.async_block_till_done()
hls_client = await hls_stream(stream) hls_client = await hls_stream(stream)
@ -261,7 +273,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
assert resp.status == 200 assert resp.status == 200
assert await resp.text() == make_playlist(sequence=1, segments=[make_segment(1)]) assert await resp.text() == make_playlist(sequence=1, segments=[make_segment(1)])
hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION)) hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, SEGMENT_DURATION, start_time=FAKE_TIME))
await hass.async_block_till_done() await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8") resp = await hls_client.get("/playlist.m3u8")
assert resp.status == 200 assert resp.status == 200
@ -285,7 +297,15 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync):
# Produce enough segments to overfill the output buffer by one # Produce enough segments to overfill the output buffer by one
for sequence in range(1, MAX_SEGMENTS + 2): for sequence in range(1, MAX_SEGMENTS + 2):
hls.put(Segment(sequence, INIT_BYTES, MOOF_BYTES, DURATION)) hls.put(
Segment(
sequence,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
start_time=FAKE_TIME,
)
)
await hass.async_block_till_done() await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8") resp = await hls_client.get("/playlist.m3u8")
@ -322,9 +342,36 @@ async def test_hls_playlist_view_discontinuity(hass, hls_stream, stream_worker_s
stream_worker_sync.pause() stream_worker_sync.pause()
hls = stream.add_provider(HLS_PROVIDER) hls = stream.add_provider(HLS_PROVIDER)
hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) hls.put(
hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) Segment(
hls.put(Segment(3, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=1)) 1,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
stream_id=0,
start_time=FAKE_TIME,
)
)
hls.put(
Segment(
2,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
stream_id=0,
start_time=FAKE_TIME,
)
)
hls.put(
Segment(
3,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
stream_id=1,
start_time=FAKE_TIME,
)
)
await hass.async_block_till_done() await hass.async_block_till_done()
hls_client = await hls_stream(stream) hls_client = await hls_stream(stream)
@ -354,11 +401,29 @@ async def test_hls_max_segments_discontinuity(hass, hls_stream, stream_worker_sy
hls_client = await hls_stream(stream) hls_client = await hls_stream(stream)
hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) hls.put(
Segment(
1,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
stream_id=0,
start_time=FAKE_TIME,
)
)
# Produce enough segments to overfill the output buffer by one # Produce enough segments to overfill the output buffer by one
for sequence in range(1, MAX_SEGMENTS + 2): for sequence in range(1, MAX_SEGMENTS + 2):
hls.put(Segment(sequence, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=1)) hls.put(
Segment(
sequence,
INIT_BYTES,
MOOF_BYTES,
SEGMENT_DURATION,
stream_id=1,
start_time=FAKE_TIME,
)
)
await hass.async_block_till_done() await hass.async_block_till_done()
resp = await hls_client.get("/playlist.m3u8") resp = await hls_client.get("/playlist.m3u8")

View File

@ -256,8 +256,8 @@ async def test_record_stream_audio(
recorder = stream.add_provider(RECORDER_PROVIDER) recorder = stream.add_provider(RECORDER_PROVIDER)
while True: while True:
segment = await recorder.recv() await recorder.recv()
if not segment: if not (segment := recorder.last_segment):
break break
last_segment = segment last_segment = segment
stream_worker_sync.resume() stream_worker_sync.resume()