From f005c686301560411d854c1803097a70aeb63084 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Tue, 23 Feb 2021 10:37:19 +0800 Subject: [PATCH] Restore stream recorder functionality and add discontinuity support (#46772) * Add discontinuity support to stream recorder * Use same container options for both StreamOutputs * Fix pts adjuster * Remove redundant/incorrect duplicate hls segment check * Use same StreamBuffer across outputs * Remove keepalive check for recorder * Set output video timescale explicitly * Disable avoid_negative_ts --- homeassistant/components/stream/__init__.py | 2 +- homeassistant/components/stream/const.py | 4 + homeassistant/components/stream/core.py | 30 ++----- homeassistant/components/stream/hls.py | 34 ++------ homeassistant/components/stream/recorder.py | 88 +++++++++++---------- homeassistant/components/stream/worker.py | 64 ++++++++------- tests/components/stream/test_recorder.py | 15 +++- 7 files changed, 107 insertions(+), 130 deletions(-) diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 0027152dbd6..2d115c6978d 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -136,7 +136,7 @@ class Stream: @callback def idle_callback(): - if not self.keepalive and fmt in self._outputs: + if (not self.keepalive or fmt == "recorder") and fmt in self._outputs: self.remove_provider(self._outputs[fmt]) self.check_idle() diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 41df806d020..a2557286cf1 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -6,6 +6,10 @@ ATTR_STREAMS = "streams" OUTPUT_FORMATS = ["hls"] +SEGMENT_CONTAINER_FORMAT = "mp4" # format for segments +RECORDER_CONTAINER_FORMAT = "mp4" # format for recorder output +AUDIO_CODECS = {"aac", "mp3"} + FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"} OUTPUT_IDLE_TIMEOUT = 300 # Idle timeout due to inactivity diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index eba6a069698..17d4516344a 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -12,7 +12,7 @@ from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.event import async_call_later from homeassistant.util.decorator import Registry -from .const import ATTR_STREAMS, DOMAIN, MAX_SEGMENTS +from .const import ATTR_STREAMS, DOMAIN PROVIDERS = Registry() @@ -83,13 +83,15 @@ class IdleTimer: class StreamOutput: """Represents a stream output.""" - def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None: + def __init__( + self, hass: HomeAssistant, idle_timer: IdleTimer, deque_maxlen: int = None + ) -> None: """Initialize a stream output.""" self._hass = hass self._idle_timer = idle_timer self._cursor = None self._event = asyncio.Event() - self._segments = deque(maxlen=MAX_SEGMENTS) + self._segments = deque(maxlen=deque_maxlen) @property def name(self) -> str: @@ -101,26 +103,6 @@ class StreamOutput: """Return True if the output is idle.""" return self._idle_timer.idle - @property - def format(self) -> str: - """Return container format.""" - return None - - @property - def audio_codecs(self) -> str: - """Return desired audio codecs.""" - return None - - @property - def video_codecs(self) -> tuple: - """Return desired video codecs.""" - return None - - @property - def container_options(self) -> Callable[[int], dict]: - """Return Callable which takes a sequence number and returns container options.""" - return None - @property def segments(self) -> List[int]: """Return current sequence from segments.""" @@ -177,7 +159,7 @@ class StreamOutput: """Handle cleanup.""" self._event.set() self._idle_timer.clear() - self._segments = deque(maxlen=MAX_SEGMENTS) + self._segments = deque(maxlen=self._segments.maxlen) class StreamView(HomeAssistantView): diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 28d6a300ae7..b2600977971 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -1,13 +1,12 @@ """Provide functionality to stream HLS.""" import io -from typing import Callable from aiohttp import web from homeassistant.core import callback -from .const import FORMAT_CONTENT_TYPE, NUM_PLAYLIST_SEGMENTS -from .core import PROVIDERS, StreamOutput, StreamView +from .const import FORMAT_CONTENT_TYPE, MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS +from .core import PROVIDERS, HomeAssistant, IdleTimer, StreamOutput, StreamView from .fmp4utils import get_codec_string, get_init, get_m4s @@ -159,32 +158,11 @@ class HlsSegmentView(StreamView): class HlsStreamOutput(StreamOutput): """Represents HLS Output formats.""" + def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None: + """Initialize recorder output.""" + super().__init__(hass, idle_timer, deque_maxlen=MAX_SEGMENTS) + @property def name(self) -> str: """Return provider name.""" return "hls" - - @property - def format(self) -> str: - """Return container format.""" - return "mp4" - - @property - def audio_codecs(self) -> str: - """Return desired audio codecs.""" - return {"aac", "mp3"} - - @property - def video_codecs(self) -> tuple: - """Return desired video codecs.""" - return {"hevc", "h264"} - - @property - def container_options(self) -> Callable[[int], dict]: - """Return Callable which takes a sequence number and returns container options.""" - return lambda sequence: { - # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 - "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont", - "avoid_negative_ts": "make_non_negative", - "fragment_index": str(sequence), - } diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index 0b77d0ba630..0344e220647 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -2,12 +2,13 @@ import logging import os import threading -from typing import List +from typing import Deque, List import av from homeassistant.core import HomeAssistant, callback +from .const import RECORDER_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT from .core import PROVIDERS, IdleTimer, Segment, StreamOutput _LOGGER = logging.getLogger(__name__) @@ -18,28 +19,20 @@ def async_setup_recorder(hass): """Only here so Provider Registry works.""" -def recorder_save_worker(file_out: str, segments: List[Segment], container_format: str): +def recorder_save_worker(file_out: str, segments: Deque[Segment]): """Handle saving stream.""" if not os.path.exists(os.path.dirname(file_out)): os.makedirs(os.path.dirname(file_out), exist_ok=True) - first_pts = {"video": None, "audio": None} - output = av.open(file_out, "w", format=container_format) + pts_adjuster = {"video": None, "audio": None} + output = None output_v = None output_a = None - # Get first_pts values from first segment - if len(segments) > 0: - segment = segments[0] - source = av.open(segment.segment, "r", format=container_format) - source_v = source.streams.video[0] - first_pts["video"] = source_v.start_time - if len(source.streams.audio) > 0: - source_a = source.streams.audio[0] - first_pts["audio"] = int( - source_v.start_time * source_v.time_base / source_a.time_base - ) - source.close() + last_stream_id = None + # The running duration of processed segments. Note that this is in av.time_base + # units which seem to be defined inversely to how stream time_bases are defined + running_duration = 0 last_sequence = float("-inf") for segment in segments: @@ -50,26 +43,54 @@ def recorder_save_worker(file_out: str, segments: List[Segment], container_forma last_sequence = segment.sequence # Open segment - source = av.open(segment.segment, "r", format=container_format) + source = av.open(segment.segment, "r", format=SEGMENT_CONTAINER_FORMAT) source_v = source.streams.video[0] - # Add output streams + source_a = source.streams.audio[0] if len(source.streams.audio) > 0 else None + + # Create output on first segment + if not output: + output = av.open( + file_out, + "w", + format=RECORDER_CONTAINER_FORMAT, + container_options={ + "video_track_timescale": str(int(1 / source_v.time_base)) + }, + ) + + # Add output streams if necessary if not output_v: output_v = output.add_stream(template=source_v) context = output_v.codec_context context.flags |= "GLOBAL_HEADER" - if not output_a and len(source.streams.audio) > 0: - source_a = source.streams.audio[0] + if source_a and not output_a: output_a = output.add_stream(template=source_a) + # Recalculate pts adjustments on first segment and on any discontinuity + # We are assuming time base is the same across all discontinuities + if last_stream_id != segment.stream_id: + last_stream_id = segment.stream_id + pts_adjuster["video"] = int( + (running_duration - source.start_time) + / (av.time_base * source_v.time_base) + ) + if source_a: + pts_adjuster["audio"] = int( + (running_duration - source.start_time) + / (av.time_base * source_a.time_base) + ) + # Remux video for packet in source.demux(): if packet.dts is None: continue - packet.pts -= first_pts[packet.stream.type] - packet.dts -= first_pts[packet.stream.type] + packet.pts += pts_adjuster[packet.stream.type] + packet.dts += pts_adjuster[packet.stream.type] packet.stream = output_v if packet.stream.type == "video" else output_a output.mux(packet) + running_duration += source.duration - source.start_time + source.close() output.close() @@ -83,33 +104,15 @@ class RecorderOutput(StreamOutput): """Initialize recorder output.""" super().__init__(hass, idle_timer) self.video_path = None - self._segments = [] @property def name(self) -> str: """Return provider name.""" return "recorder" - @property - def format(self) -> str: - """Return container format.""" - return "mp4" - - @property - def audio_codecs(self) -> str: - """Return desired audio codec.""" - return {"aac", "mp3"} - - @property - def video_codecs(self) -> tuple: - """Return desired video codecs.""" - return {"hevc", "h264"} - def prepend(self, segments: List[Segment]) -> None: """Prepend segments to existing list.""" - own_segments = self.segments - segments = [s for s in segments if s.sequence not in own_segments] - self._segments = segments + self._segments + self._segments.extendleft(reversed(segments)) def cleanup(self): """Write recording and clean up.""" @@ -117,9 +120,8 @@ class RecorderOutput(StreamOutput): thread = threading.Thread( name="recorder_save_worker", target=recorder_save_worker, - args=(self.video_path, self._segments, self.format), + args=(self.video_path, self._segments), ) thread.start() super().cleanup() - self._segments = [] diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 61d4f5db17a..d5760877c43 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -6,10 +6,12 @@ import logging import av from .const import ( + AUDIO_CODECS, MAX_MISSING_DTS, MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO, + SEGMENT_CONTAINER_FORMAT, STREAM_TIMEOUT, ) from .core import Segment, StreamBuffer @@ -17,19 +19,20 @@ from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) -def create_stream_buffer(stream_output, video_stream, audio_stream, sequence): +def create_stream_buffer(video_stream, audio_stream, sequence): """Create a new StreamBuffer.""" segment = io.BytesIO() - container_options = ( - stream_output.container_options(sequence) - if stream_output.container_options - else {} - ) + container_options = { + # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 + "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont", + "avoid_negative_ts": "disabled", + "fragment_index": str(sequence), + } output = av.open( segment, mode="w", - format=stream_output.format, + format=SEGMENT_CONTAINER_FORMAT, container_options={ "video_track_timescale": str(int(1 / video_stream.time_base)), **container_options, @@ -38,7 +41,7 @@ def create_stream_buffer(stream_output, video_stream, audio_stream, sequence): vstream = output.add_stream(template=video_stream) # Check if audio is requested astream = None - if audio_stream and audio_stream.name in stream_output.audio_codecs: + if audio_stream and audio_stream.name in AUDIO_CODECS: astream = output.add_stream(template=audio_stream) return StreamBuffer(segment, output, vstream, astream) @@ -52,10 +55,11 @@ class SegmentBuffer: self._video_stream = None self._audio_stream = None self._outputs_callback = outputs_callback - # tuple of StreamOutput, StreamBuffer + # Each element is a StreamOutput self._outputs = [] self._sequence = 0 self._segment_start_pts = None + self._stream_buffer = None def set_streams(self, video_stream, audio_stream): """Initialize output buffer with streams from container.""" @@ -70,14 +74,10 @@ class SegmentBuffer: # Fetch the latest StreamOutputs, which may have changed since the # worker started. - self._outputs = [] - for stream_output in self._outputs_callback().values(): - if self._video_stream.name not in stream_output.video_codecs: - continue - buffer = create_stream_buffer( - stream_output, self._video_stream, self._audio_stream, self._sequence - ) - self._outputs.append((buffer, stream_output)) + self._outputs = self._outputs_callback().values() + self._stream_buffer = create_stream_buffer( + self._video_stream, self._audio_stream, self._sequence + ) def mux_packet(self, packet): """Mux a packet to the appropriate StreamBuffers.""" @@ -93,22 +93,21 @@ class SegmentBuffer: self.reset(packet.pts) # Mux the packet - for (buffer, _) in self._outputs: - if packet.stream == self._video_stream: - packet.stream = buffer.vstream - elif packet.stream == self._audio_stream: - packet.stream = buffer.astream - else: - continue - buffer.output.mux(packet) + if packet.stream == self._video_stream: + packet.stream = self._stream_buffer.vstream + self._stream_buffer.output.mux(packet) + elif packet.stream == self._audio_stream: + packet.stream = self._stream_buffer.astream + self._stream_buffer.output.mux(packet) def flush(self, duration): """Create a segment from the buffered packets and write to output.""" - for (buffer, stream_output) in self._outputs: - buffer.output.close() - stream_output.put( - Segment(self._sequence, buffer.segment, duration, self._stream_id) - ) + self._stream_buffer.output.close() + segment = Segment( + self._sequence, self._stream_buffer.segment, duration, self._stream_id + ) + for stream_output in self._outputs: + stream_output.put(segment) def discontinuity(self): """Mark the stream as having been restarted.""" @@ -118,9 +117,8 @@ class SegmentBuffer: self._stream_id += 1 def close(self): - """Close all StreamBuffers.""" - for (buffer, _) in self._outputs: - buffer.output.close() + """Close stream buffer.""" + self._stream_buffer.output.close() def stream_worker(source, options, segment_buffer, quit_event): diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 9d418c360b1..199020097bd 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -174,7 +174,20 @@ async def test_recorder_save(tmpdir): filename = f"{tmpdir}/test.mp4" # Run - recorder_save_worker(filename, [Segment(1, source, 4)], "mp4") + recorder_save_worker(filename, [Segment(1, source, 4)]) + + # Assert + assert os.path.exists(filename) + + +async def test_recorder_discontinuity(tmpdir): + """Test recorder save across a discontinuity.""" + # Setup + source = generate_h264_video() + filename = f"{tmpdir}/test.mp4" + + # Run + recorder_save_worker(filename, [Segment(1, source, 4, 0), Segment(2, source, 4, 1)]) # Assert assert os.path.exists(filename)