From 5355fcaba8efc21ab342a5229eb30dd6d71f3285 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Wed, 12 Aug 2020 05:12:41 +0800 Subject: [PATCH] Add H.265 support to stream component (#38125) * Add H.265 support to stream component * Change find_box to generator * Move fmp4 utilities to fmp4utils.py * Add minimum segments and segment durations * Remove MIN_SEGMENTS * Fix when container_options is None * Fix missing num_segments and update tests * Remove unnecessary mock attribute * Fix Segment construction in test_recorder_save * fix recorder with lookback Co-authored-by: Jason Hunter --- homeassistant/components/stream/__init__.py | 3 +- homeassistant/components/stream/const.py | 3 ++ homeassistant/components/stream/core.py | 22 ++++---- homeassistant/components/stream/fmp4utils.py | 50 +++++++++++++++++ homeassistant/components/stream/hls.py | 56 +++++++++++++++----- homeassistant/components/stream/recorder.py | 18 +++---- homeassistant/components/stream/worker.py | 44 +++++++++++---- tests/components/stream/common.py | 2 +- tests/components/stream/test_hls.py | 16 ++++-- tests/components/stream/test_init.py | 1 - tests/components/stream/test_recorder.py | 5 +- 11 files changed, 169 insertions(+), 51 deletions(-) create mode 100644 homeassistant/components/stream/fmp4utils.py diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index a84f37ee126..aeab212b78c 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -18,6 +18,7 @@ from .const import ( CONF_LOOKBACK, CONF_STREAM_SOURCE, DOMAIN, + MAX_SEGMENTS, SERVICE_RECORD, ) from .core import PROVIDERS @@ -225,7 +226,7 @@ async def async_handle_record_service(hass, call): # Take advantage of lookback hls = stream.outputs.get("hls") if lookback > 0 and hls: - num_segments = min(int(lookback // hls.target_duration), hls.num_segments) + num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) # Wait for latest segment, then add the lookback await hls.recv() recorder.prepend(list(hls.get_segment())[-num_segments:]) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 2f50ff26226..7bc900c25e6 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -16,3 +16,6 @@ OUTPUT_FORMATS = ["hls"] FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"} AUDIO_SAMPLE_RATE = 44100 + +MAX_SEGMENTS = 3 # Max number of segments to keep around +MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 715ae47e133..06e1e659202 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -12,7 +12,7 @@ from homeassistant.core import callback from homeassistant.helpers.event import async_call_later from homeassistant.util.decorator import Registry -from .const import ATTR_STREAMS, DOMAIN +from .const import ATTR_STREAMS, DOMAIN, MAX_SEGMENTS PROVIDERS = Registry() @@ -34,13 +34,12 @@ class Segment: sequence: int = attr.ib() segment: io.BytesIO = attr.ib() duration: float = attr.ib() + start_pts: tuple = attr.ib() class StreamOutput: """Represents a stream output.""" - num_segments = 3 - def __init__(self, stream, timeout: int = 300) -> None: """Initialize a stream output.""" self.idle = False @@ -48,7 +47,7 @@ class StreamOutput: self._stream = stream self._cursor = None self._event = asyncio.Event() - self._segments = deque(maxlen=self.num_segments) + self._segments = deque(maxlen=MAX_SEGMENTS) self._unsub = None @property @@ -67,8 +66,13 @@ class StreamOutput: return None @property - def video_codec(self) -> str: - """Return desired video codec.""" + def video_codecs(self) -> tuple: + """Return desired video codecs.""" + return None + + @property + def container_options(self) -> dict: + """Return container options.""" return None @property @@ -78,12 +82,12 @@ class StreamOutput: @property def target_duration(self) -> int: - """Return the average duration of the segments in seconds.""" + """Return the max duration of any given segment in seconds.""" segment_length = len(self._segments) if not segment_length: return 0 durations = [s.duration for s in self._segments] - return round(sum(durations) // segment_length) or 1 + return round(max(durations)) or 1 def get_segment(self, sequence: int = None) -> Any: """Retrieve a specific segment, or the whole list.""" @@ -147,7 +151,7 @@ class StreamOutput: def cleanup(self): """Handle cleanup.""" - self._segments = deque(maxlen=self.num_segments) + self._segments = deque(maxlen=MAX_SEGMENTS) self._stream.remove_provider(self) diff --git a/homeassistant/components/stream/fmp4utils.py b/homeassistant/components/stream/fmp4utils.py new file mode 100644 index 00000000000..025e576bc06 --- /dev/null +++ b/homeassistant/components/stream/fmp4utils.py @@ -0,0 +1,50 @@ +"""Utilities to help convert mp4s to fmp4s.""" +import io + + +def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int: + """Find location of first box (or sub_box if box_start provided) of given type.""" + if box_start == 0: + box_end = len(segment.getbuffer()) + index = 0 + else: + segment.seek(box_start) + box_end = box_start + int.from_bytes(segment.read(4), byteorder="big") + index = box_start + 8 + while 1: + if index > box_end - 8: # End of box, not found + break + segment.seek(index) + box_header = segment.read(8) + if box_header[4:8] == target_type: + yield index + segment.seek(index) + index += int.from_bytes(box_header[0:4], byteorder="big") + + +def get_init(segment: io.BytesIO) -> bytes: + """Get init section from fragmented mp4.""" + moof_location = next(find_box(segment, b"moof")) + segment.seek(0) + return segment.read(moof_location) + + +def get_m4s(segment: io.BytesIO, start_pts: tuple, sequence: int) -> bytes: + """Get m4s section from fragmented mp4.""" + moof_location = next(find_box(segment, b"moof")) + mfra_location = next(find_box(segment, b"mfra")) + # adjust mfhd sequence number in moof + view = segment.getbuffer() + view[moof_location + 20 : moof_location + 24] = sequence.to_bytes(4, "big") + # adjust tfdt in video traf + traf_finder = find_box(segment, b"traf", moof_location) + traf_location = next(traf_finder) + tfdt_location = next(find_box(segment, b"tfdt", traf_location)) + view[tfdt_location + 12 : tfdt_location + 20] = start_pts[0].to_bytes(8, "big") + # adjust tfdt in audio traf + traf_location = next(traf_finder) + tfdt_location = next(find_box(segment, b"tfdt", traf_location)) + view[tfdt_location + 12 : tfdt_location + 20] = start_pts[1].to_bytes(8, "big") + # done adjusting + segment.seek(moof_location) + return segment.read(mfra_location - moof_location) diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 2cd98c0a00f..66cf3583b60 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -6,6 +6,7 @@ from homeassistant.util.dt import utcnow from .const import FORMAT_CONTENT_TYPE from .core import PROVIDERS, StreamOutput, StreamView +from .fmp4utils import get_init, get_m4s @callback @@ -13,6 +14,7 @@ def async_setup_hls(hass): """Set up api endpoints.""" hass.http.register_view(HlsPlaylistView()) hass.http.register_view(HlsSegmentView()) + hass.http.register_view(HlsInitView()) return "/api/hls/{}/playlist.m3u8" @@ -37,21 +39,41 @@ class HlsPlaylistView(StreamView): ) -class HlsSegmentView(StreamView): - """Stream view to serve a MPEG2TS segment.""" +class HlsInitView(StreamView): + """Stream view to serve HLS init.mp4.""" - url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.ts" + url = r"/api/hls/{token:[a-f0-9]+}/init.mp4" + name = "api:stream:hls:init" + cors_allowed = True + + async def handle(self, request, stream, sequence): + """Return init.mp4.""" + track = stream.add_provider("hls") + segments = track.get_segment() + if not segments: + return web.HTTPNotFound() + headers = {"Content-Type": "video/mp4"} + return web.Response(body=get_init(segments[0].segment), headers=headers) + + +class HlsSegmentView(StreamView): + """Stream view to serve a HLS fmp4 segment.""" + + url = r"/api/hls/{token:[a-f0-9]+}/segment/{sequence:\d+}.m4s" name = "api:stream:hls:segment" cors_allowed = True async def handle(self, request, stream, sequence): - """Return mpegts segment.""" + """Return fmp4 segment.""" track = stream.add_provider("hls") segment = track.get_segment(int(sequence)) if not segment: return web.HTTPNotFound() - headers = {"Content-Type": "video/mp2t"} - return web.Response(body=segment.segment.getvalue(), headers=headers) + headers = {"Content-Type": "video/iso.segment"} + return web.Response( + body=get_m4s(segment.segment, segment.start_pts, int(sequence)), + headers=headers, + ) class M3U8Renderer: @@ -64,7 +86,12 @@ class M3U8Renderer: @staticmethod def render_preamble(track): """Render preamble.""" - return ["#EXT-X-VERSION:3", f"#EXT-X-TARGETDURATION:{track.target_duration}"] + return [ + "#EXT-X-VERSION:7", + f"#EXT-X-TARGETDURATION:{track.target_duration}", + '#EXT-X-MAP:URI="init.mp4"', + "#EXT-X-INDEPENDENT-SEGMENTS", + ] @staticmethod def render_playlist(track, start_time): @@ -81,7 +108,7 @@ class M3U8Renderer: playlist.extend( [ "#EXTINF:{:.04f},".format(float(segment.duration)), - f"./segment/{segment.sequence}.ts", + f"./segment/{segment.sequence}.m4s", ] ) @@ -109,7 +136,7 @@ class HlsStreamOutput(StreamOutput): @property def format(self) -> str: """Return container format.""" - return "mpegts" + return "mp4" @property def audio_codec(self) -> str: @@ -117,6 +144,11 @@ class HlsStreamOutput(StreamOutput): return "aac" @property - def video_codec(self) -> str: - """Return desired video codec.""" - return "h264" + def video_codecs(self) -> tuple: + """Return desired video codecs.""" + return {"hevc", "h264"} + + @property + def container_options(self) -> dict: + """Return container options.""" + return {"movflags": "frag_custom+empty_moov+default_base_moof"} diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index c28c73c64ac..90d6cdccfc1 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -17,14 +17,14 @@ def async_setup_recorder(hass): def recorder_save_worker(file_out: str, segments: List[Segment]): """Handle saving stream.""" - first_pts = None + first_pts = segments[0].start_pts[0] output = av.open(file_out, "w") output_v = None for segment in segments: # Seek to beginning and open segment segment.segment.seek(0) - source = av.open(segment.segment, "r", format="mpegts") + source = av.open(segment.segment, "r", format="mp4") source_v = source.streams.video[0] # Add output streams @@ -36,9 +36,9 @@ def recorder_save_worker(file_out: str, segments: List[Segment]): # Remux video for packet in source.demux(source_v): if packet is not None and packet.dts is not None: - if first_pts is None: - first_pts = packet.pts - + if packet.pts < segment.start_pts[0]: + packet.pts += segment.start_pts[0] + packet.dts += segment.start_pts[0] packet.pts -= first_pts packet.dts -= first_pts packet.stream = output_v @@ -67,7 +67,7 @@ class RecorderOutput(StreamOutput): @property def format(self) -> str: """Return container format.""" - return "mpegts" + return "mp4" @property def audio_codec(self) -> str: @@ -75,9 +75,9 @@ class RecorderOutput(StreamOutput): return "aac" @property - def video_codec(self) -> str: - """Return desired video codec.""" - return "h264" + def video_codecs(self) -> tuple: + """Return desired video codecs.""" + return {"hevc", "h264"} def prepend(self, segments: List[Segment]) -> None: """Prepend segments to existing list.""" diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 965f2611ed4..461d29de421 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -5,7 +5,7 @@ import logging import av -from .const import AUDIO_SAMPLE_RATE +from .const import AUDIO_SAMPLE_RATE, MIN_SEGMENT_DURATION from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) @@ -29,7 +29,15 @@ def create_stream_buffer(stream_output, video_stream, audio_frame): a_packet = None segment = io.BytesIO() - output = av.open(segment, mode="w", format=stream_output.format) + output = av.open( + segment, + mode="w", + format=stream_output.format, + container_options={ + "video_track_timescale": str(int(1 / video_stream.time_base)), + **(stream_output.container_options or {}), + }, + ) vstream = output.add_stream(template=video_stream) # Check if audio is requested astream = None @@ -68,6 +76,9 @@ def stream_worker(hass, stream, quit_event): last_dts = None # Keep track of consecutive packets without a dts to detect end of stream. last_packet_was_without_dts = False + # The pts at the beginning of the segment + segment_start_v_pts = 0 + segment_start_a_pts = 0 while not quit_event.is_set(): try: @@ -99,13 +110,15 @@ def stream_worker(hass, stream, quit_event): packet.dts -= first_pts packet.pts -= first_pts - # Reset segment on every keyframe - if packet.is_keyframe: - # Calculate the segment duration by multiplying the presentation - # timestamp by the time base, which gets us total seconds. - # By then dividing by the sequence, we can calculate how long - # each segment is, assuming the stream starts from 0. - segment_duration = (packet.pts * packet.time_base) / sequence + # Reset segment on keyframe after we reach desired segment duration + if ( + packet.is_keyframe + and (packet.pts - segment_start_v_pts) * packet.time_base + >= MIN_SEGMENT_DURATION + ): + # Calculate the segment duration by multiplying the difference of the next and the current + # keyframe presentation timestamps by the time base, which gets us total seconds. + segment_duration = (packet.pts - segment_start_v_pts) * packet.time_base # Save segment to outputs for fmt, buffer in outputs.items(): buffer.output.close() @@ -113,17 +126,26 @@ def stream_worker(hass, stream, quit_event): if stream.outputs.get(fmt): hass.loop.call_soon_threadsafe( stream.outputs[fmt].put, - Segment(sequence, buffer.segment, segment_duration), + Segment( + sequence, + buffer.segment, + segment_duration, + (segment_start_v_pts, segment_start_a_pts), + ), ) # Clear outputs and increment sequence outputs = {} if not first_packet: sequence += 1 + segment_start_v_pts = packet.pts + segment_start_a_pts = int( + packet.pts * packet.time_base * AUDIO_SAMPLE_RATE + ) # Initialize outputs for stream_output in stream.outputs.values(): - if video_stream.name != stream_output.video_codec: + if video_stream.name not in stream_output.video_codecs: continue a_packet, buffer = create_stream_buffer( diff --git a/tests/components/stream/common.py b/tests/components/stream/common.py index 4c34ec0b341..354b94f7089 100644 --- a/tests/components/stream/common.py +++ b/tests/components/stream/common.py @@ -20,7 +20,7 @@ def generate_h264_video(): total_frames = duration * fps output = io.BytesIO() - output.name = "test.ts" + output.name = "test.mp4" container = av.open(output, mode="w") stream = container.add_stream("libx264", rate=fps) diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 3de50d3309c..b4c0b0e536f 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -38,6 +38,13 @@ async def test_hls_stream(hass, hass_client): playlist_response = await http_client.get(parsed_url.path) assert playlist_response.status == 200 + # Fetch init + playlist = await playlist_response.text() + playlist_url = "/".join(parsed_url.path.split("/")[:-1]) + init_url = playlist_url + "/init.mp4" + init_response = await http_client.get(init_url) + assert init_response.status == 200 + # Fetch segment playlist = await playlist_response.text() playlist_url = "/".join(parsed_url.path.split("/")[:-1]) @@ -99,15 +106,16 @@ async def test_stream_ended(hass): source = generate_h264_video() stream = preload_stream(hass, source) track = stream.add_provider("hls") - track.num_segments = 2 # Request stream request_stream(hass, source) # Run it dead - segments = 0 - while await track.recv() is not None: - segments += 1 + while True: + segment = await track.recv() + if segment is None: + break + segments = segment.sequence assert segments > 1 assert not track.get_segment() diff --git a/tests/components/stream/test_init.py b/tests/components/stream/test_init.py index dc7892e069a..505de7ca018 100644 --- a/tests/components/stream/test_init.py +++ b/tests/components/stream/test_init.py @@ -72,7 +72,6 @@ async def test_record_service_lookback(hass): ): # Setup stubs hls_mock = MagicMock() - hls_mock.num_segments = 3 hls_mock.target_duration = 2 hls_mock.recv = AsyncMock(return_value=None) stream_mock.return_value.outputs = {"hls": hls_mock} diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index fbbeaf0ff44..ff18cb66590 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -31,12 +31,11 @@ async def test_record_stream(hass, hass_client): recorder = stream.add_provider("recorder") stream.start() - segments = 0 while True: segment = await recorder.recv() if not segment: break - segments += 1 + segments = segment.sequence stream.stop() @@ -76,7 +75,7 @@ async def test_recorder_save(): output.name = "test.mp4" # Run - recorder_save_worker(output, [Segment(1, source, 4)]) + recorder_save_worker(output, [Segment(1, source, 4, (360000, 176400))]) # Assert assert output.getvalue()