Add H.265 support to stream component (#38125)

* Add H.265 support to stream component

* Change find_box to generator

* Move fmp4 utilities to fmp4utils.py

* Add minimum segments and segment durations

* Remove MIN_SEGMENTS

* Fix when container_options is None

* Fix missing num_segments and update tests

* Remove unnecessary mock attribute

* Fix Segment construction in test_recorder_save

* fix recorder with lookback

Co-authored-by: Jason Hunter <hunterjm@gmail.com>
This commit is contained in:
uvjustin 2020-08-12 05:12:41 +08:00 committed by GitHub
parent d0a59e28ac
commit 5355fcaba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 169 additions and 51 deletions

View File

@ -18,6 +18,7 @@ from .const import (
CONF_LOOKBACK,
CONF_STREAM_SOURCE,
DOMAIN,
MAX_SEGMENTS,
SERVICE_RECORD,
)
from .core import PROVIDERS
@ -225,7 +226,7 @@ async def async_handle_record_service(hass, call):
# Take advantage of lookback
hls = stream.outputs.get("hls")
if lookback > 0 and hls:
num_segments = min(int(lookback // hls.target_duration), hls.num_segments)
num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS)
# Wait for latest segment, then add the lookback
await hls.recv()
recorder.prepend(list(hls.get_segment())[-num_segments:])

View File

@ -16,3 +16,6 @@ OUTPUT_FORMATS = ["hls"]
FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"}
AUDIO_SAMPLE_RATE = 44100
MAX_SEGMENTS = 3 # Max number of segments to keep around
MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds

View File

@ -12,7 +12,7 @@ from homeassistant.core import callback
from homeassistant.helpers.event import async_call_later
from homeassistant.util.decorator import Registry
from .const import ATTR_STREAMS, DOMAIN
from .const import ATTR_STREAMS, DOMAIN, MAX_SEGMENTS
PROVIDERS = Registry()
@ -34,13 +34,12 @@ class Segment:
sequence: int = attr.ib()
segment: io.BytesIO = attr.ib()
duration: float = attr.ib()
start_pts: tuple = attr.ib()
class StreamOutput:
"""Represents a stream output."""
num_segments = 3
def __init__(self, stream, timeout: int = 300) -> None:
"""Initialize a stream output."""
self.idle = False
@ -48,7 +47,7 @@ class StreamOutput:
self._stream = stream
self._cursor = None
self._event = asyncio.Event()
self._segments = deque(maxlen=self.num_segments)
self._segments = deque(maxlen=MAX_SEGMENTS)
self._unsub = None
@property
@ -67,8 +66,13 @@ class StreamOutput:
return None
@property
def video_codec(self) -> str:
"""Return desired video codec."""
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return None
@property
def container_options(self) -> dict:
"""Return container options."""
return None
@property
@ -78,12 +82,12 @@ class StreamOutput:
@property
def target_duration(self) -> int:
"""Return the average duration of the segments in seconds."""
"""Return the max duration of any given segment in seconds."""
segment_length = len(self._segments)
if not segment_length:
return 0
durations = [s.duration for s in self._segments]
return round(sum(durations) // segment_length) or 1
return round(max(durations)) or 1
def get_segment(self, sequence: int = None) -> Any:
"""Retrieve a specific segment, or the whole list."""
@ -147,7 +151,7 @@ class StreamOutput:
def cleanup(self):
"""Handle cleanup."""
self._segments = deque(maxlen=self.num_segments)
self._segments = deque(maxlen=MAX_SEGMENTS)
self._stream.remove_provider(self)

View File

@ -0,0 +1,50 @@
"""Utilities to help convert mp4s to fmp4s."""
import io
def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int:
"""Find location of first box (or sub_box if box_start provided) of given type."""
if box_start == 0:
box_end = len(segment.getbuffer())
index = 0
else:
segment.seek(box_start)
box_end = box_start + int.from_bytes(segment.read(4), byteorder="big")
index = box_start + 8
while 1:
if index > box_end - 8: # End of box, not found
break
segment.seek(index)
box_header = segment.read(8)
if box_header[4:8] == target_type:
yield index
segment.seek(index)
index += int.from_bytes(box_header[0:4], byteorder="big")
def get_init(segment: io.BytesIO) -> bytes:
"""Get init section from fragmented mp4."""
moof_location = next(find_box(segment, b"moof"))
segment.seek(0)
return segment.read(moof_location)
def get_m4s(segment: io.BytesIO, start_pts: tuple, sequence: int) -> bytes:
"""Get m4s section from fragmented mp4."""
moof_location = next(find_box(segment, b"moof"))
mfra_location = next(find_box(segment, b"mfra"))
# adjust mfhd sequence number in moof
view = segment.getbuffer()
view[moof_location + 20 : moof_location + 24] = sequence.to_bytes(4, "big")
# adjust tfdt in video traf
traf_finder = find_box(segment, b"traf", moof_location)
traf_location = next(traf_finder)
tfdt_location = next(find_box(segment, b"tfdt", traf_location))
view[tfdt_location + 12 : tfdt_location + 20] = start_pts[0].to_bytes(8, "big")
# adjust tfdt in audio traf
traf_location = next(traf_finder)
tfdt_location = next(find_box(segment, b"tfdt", traf_location))
view[tfdt_location + 12 : tfdt_location + 20] = start_pts[1].to_bytes(8, "big")
# done adjusting
segment.seek(moof_location)
return segment.read(mfra_location - moof_location)

View File

@ -6,6 +6,7 @@ from homeassistant.util.dt import utcnow
from .const import FORMAT_CONTENT_TYPE
from .core import PROVIDERS, StreamOutput, StreamView
from .fmp4utils import get_init, get_m4s
@callback
@ -13,6 +14,7 @@ def async_setup_hls(hass):
"""Set up api endpoints."""
hass.http.register_view(HlsPlaylistView())
hass.http.register_view(HlsSegmentView())
hass.http.register_view(HlsInitView())
return "/api/hls/{}/playlist.m3u8"
@ -37,21 +39,41 @@ class HlsPlaylistView(StreamView):
)
class HlsSegmentView(StreamView):
"""Stream view to serve a MPEG2TS segment."""
class HlsInitView(StreamView):
"""Stream view to serve HLS init.mp4."""
url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.ts"
url = r"/api/hls/{token:[a-f0-9]+}/init.mp4"
name = "api:stream:hls:init"
cors_allowed = True
async def handle(self, request, stream, sequence):
"""Return init.mp4."""
track = stream.add_provider("hls")
segments = track.get_segment()
if not segments:
return web.HTTPNotFound()
headers = {"Content-Type": "video/mp4"}
return web.Response(body=get_init(segments[0].segment), headers=headers)
class HlsSegmentView(StreamView):
"""Stream view to serve a HLS fmp4 segment."""
url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s"
name = "api:stream:hls:segment"
cors_allowed = True
async def handle(self, request, stream, sequence):
"""Return mpegts segment."""
"""Return fmp4 segment."""
track = stream.add_provider("hls")
segment = track.get_segment(int(sequence))
if not segment:
return web.HTTPNotFound()
headers = {"Content-Type": "video/mp2t"}
return web.Response(body=segment.segment.getvalue(), headers=headers)
headers = {"Content-Type": "video/iso.segment"}
return web.Response(
body=get_m4s(segment.segment, segment.start_pts, int(sequence)),
headers=headers,
)
class M3U8Renderer:
@ -64,7 +86,12 @@ class M3U8Renderer:
@staticmethod
def render_preamble(track):
"""Render preamble."""
return ["#EXT-X-VERSION:3", f"#EXT-X-TARGETDURATION:{track.target_duration}"]
return [
"#EXT-X-VERSION:7",
f"#EXT-X-TARGETDURATION:{track.target_duration}",
'#EXT-X-MAP:URI="init.mp4"',
"#EXT-X-INDEPENDENT-SEGMENTS",
]
@staticmethod
def render_playlist(track, start_time):
@ -81,7 +108,7 @@ class M3U8Renderer:
playlist.extend(
[
"#EXTINF:{:.04f},".format(float(segment.duration)),
f"./segment/{segment.sequence}.ts",
f"./segment/{segment.sequence}.m4s",
]
)
@ -109,7 +136,7 @@ class HlsStreamOutput(StreamOutput):
@property
def format(self) -> str:
"""Return container format."""
return "mpegts"
return "mp4"
@property
def audio_codec(self) -> str:
@ -117,6 +144,11 @@ class HlsStreamOutput(StreamOutput):
return "aac"
@property
def video_codec(self) -> str:
"""Return desired video codec."""
return "h264"
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return {"hevc", "h264"}
@property
def container_options(self) -> dict:
"""Return container options."""
return {"movflags": "frag_custom+empty_moov+default_base_moof"}

View File

@ -17,14 +17,14 @@ def async_setup_recorder(hass):
def recorder_save_worker(file_out: str, segments: List[Segment]):
"""Handle saving stream."""
first_pts = None
first_pts = segments[0].start_pts[0]
output = av.open(file_out, "w")
output_v = None
for segment in segments:
# Seek to beginning and open segment
segment.segment.seek(0)
source = av.open(segment.segment, "r", format="mpegts")
source = av.open(segment.segment, "r", format="mp4")
source_v = source.streams.video[0]
# Add output streams
@ -36,9 +36,9 @@ def recorder_save_worker(file_out: str, segments: List[Segment]):
# Remux video
for packet in source.demux(source_v):
if packet is not None and packet.dts is not None:
if first_pts is None:
first_pts = packet.pts
if packet.pts < segment.start_pts[0]:
packet.pts += segment.start_pts[0]
packet.dts += segment.start_pts[0]
packet.pts -= first_pts
packet.dts -= first_pts
packet.stream = output_v
@ -67,7 +67,7 @@ class RecorderOutput(StreamOutput):
@property
def format(self) -> str:
"""Return container format."""
return "mpegts"
return "mp4"
@property
def audio_codec(self) -> str:
@ -75,9 +75,9 @@ class RecorderOutput(StreamOutput):
return "aac"
@property
def video_codec(self) -> str:
"""Return desired video codec."""
return "h264"
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return {"hevc", "h264"}
def prepend(self, segments: List[Segment]) -> None:
"""Prepend segments to existing list."""

View File

@ -5,7 +5,7 @@ import logging
import av
from .const import AUDIO_SAMPLE_RATE
from .const import AUDIO_SAMPLE_RATE, MIN_SEGMENT_DURATION
from .core import Segment, StreamBuffer
_LOGGER = logging.getLogger(__name__)
@ -29,7 +29,15 @@ def create_stream_buffer(stream_output, video_stream, audio_frame):
a_packet = None
segment = io.BytesIO()
output = av.open(segment, mode="w", format=stream_output.format)
output = av.open(
segment,
mode="w",
format=stream_output.format,
container_options={
"video_track_timescale": str(int(1 / video_stream.time_base)),
**(stream_output.container_options or {}),
},
)
vstream = output.add_stream(template=video_stream)
# Check if audio is requested
astream = None
@ -68,6 +76,9 @@ def stream_worker(hass, stream, quit_event):
last_dts = None
# Keep track of consecutive packets without a dts to detect end of stream.
last_packet_was_without_dts = False
# The pts at the beginning of the segment
segment_start_v_pts = 0
segment_start_a_pts = 0
while not quit_event.is_set():
try:
@ -99,13 +110,15 @@ def stream_worker(hass, stream, quit_event):
packet.dts -= first_pts
packet.pts -= first_pts
# Reset segment on every keyframe
if packet.is_keyframe:
# Calculate the segment duration by multiplying the presentation
# timestamp by the time base, which gets us total seconds.
# By then dividing by the sequence, we can calculate how long
# each segment is, assuming the stream starts from 0.
segment_duration = (packet.pts * packet.time_base) / sequence
# Reset segment on keyframe after we reach desired segment duration
if (
packet.is_keyframe
and (packet.pts - segment_start_v_pts) * packet.time_base
>= MIN_SEGMENT_DURATION
):
# Calculate the segment duration by multiplying the difference of the next and the current
# keyframe presentation timestamps by the time base, which gets us total seconds.
segment_duration = (packet.pts - segment_start_v_pts) * packet.time_base
# Save segment to outputs
for fmt, buffer in outputs.items():
buffer.output.close()
@ -113,17 +126,26 @@ def stream_worker(hass, stream, quit_event):
if stream.outputs.get(fmt):
hass.loop.call_soon_threadsafe(
stream.outputs[fmt].put,
Segment(sequence, buffer.segment, segment_duration),
Segment(
sequence,
buffer.segment,
segment_duration,
(segment_start_v_pts, segment_start_a_pts),
),
)
# Clear outputs and increment sequence
outputs = {}
if not first_packet:
sequence += 1
segment_start_v_pts = packet.pts
segment_start_a_pts = int(
packet.pts * packet.time_base * AUDIO_SAMPLE_RATE
)
# Initialize outputs
for stream_output in stream.outputs.values():
if video_stream.name != stream_output.video_codec:
if video_stream.name not in stream_output.video_codecs:
continue
a_packet, buffer = create_stream_buffer(

View File

@ -20,7 +20,7 @@ def generate_h264_video():
total_frames = duration * fps
output = io.BytesIO()
output.name = "test.ts"
output.name = "test.mp4"
container = av.open(output, mode="w")
stream = container.add_stream("libx264", rate=fps)

View File

@ -38,6 +38,13 @@ async def test_hls_stream(hass, hass_client):
playlist_response = await http_client.get(parsed_url.path)
assert playlist_response.status == 200
# Fetch init
playlist = await playlist_response.text()
playlist_url = "/".join(parsed_url.path.split("/")[:-1])
init_url = playlist_url + "/init.mp4"
init_response = await http_client.get(init_url)
assert init_response.status == 200
# Fetch segment
playlist = await playlist_response.text()
playlist_url = "/".join(parsed_url.path.split("/")[:-1])
@ -99,15 +106,16 @@ async def test_stream_ended(hass):
source = generate_h264_video()
stream = preload_stream(hass, source)
track = stream.add_provider("hls")
track.num_segments = 2
# Request stream
request_stream(hass, source)
# Run it dead
segments = 0
while await track.recv() is not None:
segments += 1
while True:
segment = await track.recv()
if segment is None:
break
segments = segment.sequence
assert segments > 1
assert not track.get_segment()

View File

@ -72,7 +72,6 @@ async def test_record_service_lookback(hass):
):
# Setup stubs
hls_mock = MagicMock()
hls_mock.num_segments = 3
hls_mock.target_duration = 2
hls_mock.recv = AsyncMock(return_value=None)
stream_mock.return_value.outputs = {"hls": hls_mock}

View File

@ -31,12 +31,11 @@ async def test_record_stream(hass, hass_client):
recorder = stream.add_provider("recorder")
stream.start()
segments = 0
while True:
segment = await recorder.recv()
if not segment:
break
segments += 1
segments = segment.sequence
stream.stop()
@ -76,7 +75,7 @@ async def test_recorder_save():
output.name = "test.mp4"
# Run
recorder_save_worker(output, [Segment(1, source, 4)])
recorder_save_worker(output, [Segment(1, source, 4, (360000, 176400))])
# Assert
assert output.getvalue()