Restore stream recorder functionality and add discontinuity support (#46772)

* Add discontinuity support to stream recorder

* Use same container options for both StreamOutputs

* Fix pts adjuster

* Remove redundant/incorrect duplicate hls segment check

* Use same StreamBuffer across outputs

* Remove keepalive check for recorder

* Set output video timescale explicitly

* Disable avoid_negative_ts
This commit is contained in:
uvjustin 2021-02-23 10:37:19 +08:00 committed by GitHub
parent 1cecf229b9
commit f005c68630
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 107 additions and 130 deletions

View File

@ -136,7 +136,7 @@ class Stream:
@callback @callback
def idle_callback(): def idle_callback():
if not self.keepalive and fmt in self._outputs: if (not self.keepalive or fmt == "recorder") and fmt in self._outputs:
self.remove_provider(self._outputs[fmt]) self.remove_provider(self._outputs[fmt])
self.check_idle() self.check_idle()

View File

@ -6,6 +6,10 @@ ATTR_STREAMS = "streams"
OUTPUT_FORMATS = ["hls"] OUTPUT_FORMATS = ["hls"]
SEGMENT_CONTAINER_FORMAT = "mp4" # format for segments
RECORDER_CONTAINER_FORMAT = "mp4" # format for recorder output
AUDIO_CODECS = {"aac", "mp3"}
FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"} FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"}
OUTPUT_IDLE_TIMEOUT = 300 # Idle timeout due to inactivity OUTPUT_IDLE_TIMEOUT = 300 # Idle timeout due to inactivity

View File

@ -12,7 +12,7 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_call_later from homeassistant.helpers.event import async_call_later
from homeassistant.util.decorator import Registry from homeassistant.util.decorator import Registry
from .const import ATTR_STREAMS, DOMAIN, MAX_SEGMENTS from .const import ATTR_STREAMS, DOMAIN
PROVIDERS = Registry() PROVIDERS = Registry()
@ -83,13 +83,15 @@ class IdleTimer:
class StreamOutput: class StreamOutput:
"""Represents a stream output.""" """Represents a stream output."""
def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None: def __init__(
self, hass: HomeAssistant, idle_timer: IdleTimer, deque_maxlen: int = None
) -> None:
"""Initialize a stream output.""" """Initialize a stream output."""
self._hass = hass self._hass = hass
self._idle_timer = idle_timer self._idle_timer = idle_timer
self._cursor = None self._cursor = None
self._event = asyncio.Event() self._event = asyncio.Event()
self._segments = deque(maxlen=MAX_SEGMENTS) self._segments = deque(maxlen=deque_maxlen)
@property @property
def name(self) -> str: def name(self) -> str:
@ -101,26 +103,6 @@ class StreamOutput:
"""Return True if the output is idle.""" """Return True if the output is idle."""
return self._idle_timer.idle return self._idle_timer.idle
@property
def format(self) -> str:
"""Return container format."""
return None
@property
def audio_codecs(self) -> str:
"""Return desired audio codecs."""
return None
@property
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return None
@property
def container_options(self) -> Callable[[int], dict]:
"""Return Callable which takes a sequence number and returns container options."""
return None
@property @property
def segments(self) -> List[int]: def segments(self) -> List[int]:
"""Return current sequence from segments.""" """Return current sequence from segments."""
@ -177,7 +159,7 @@ class StreamOutput:
"""Handle cleanup.""" """Handle cleanup."""
self._event.set() self._event.set()
self._idle_timer.clear() self._idle_timer.clear()
self._segments = deque(maxlen=MAX_SEGMENTS) self._segments = deque(maxlen=self._segments.maxlen)
class StreamView(HomeAssistantView): class StreamView(HomeAssistantView):

View File

@ -1,13 +1,12 @@
"""Provide functionality to stream HLS.""" """Provide functionality to stream HLS."""
import io import io
from typing import Callable
from aiohttp import web from aiohttp import web
from homeassistant.core import callback from homeassistant.core import callback
from .const import FORMAT_CONTENT_TYPE, NUM_PLAYLIST_SEGMENTS from .const import FORMAT_CONTENT_TYPE, MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS
from .core import PROVIDERS, StreamOutput, StreamView from .core import PROVIDERS, HomeAssistant, IdleTimer, StreamOutput, StreamView
from .fmp4utils import get_codec_string, get_init, get_m4s from .fmp4utils import get_codec_string, get_init, get_m4s
@ -159,32 +158,11 @@ class HlsSegmentView(StreamView):
class HlsStreamOutput(StreamOutput): class HlsStreamOutput(StreamOutput):
"""Represents HLS Output formats.""" """Represents HLS Output formats."""
def __init__(self, hass: HomeAssistant, idle_timer: IdleTimer) -> None:
"""Initialize recorder output."""
super().__init__(hass, idle_timer, deque_maxlen=MAX_SEGMENTS)
@property @property
def name(self) -> str: def name(self) -> str:
"""Return provider name.""" """Return provider name."""
return "hls" return "hls"
@property
def format(self) -> str:
"""Return container format."""
return "mp4"
@property
def audio_codecs(self) -> str:
"""Return desired audio codecs."""
return {"aac", "mp3"}
@property
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return {"hevc", "h264"}
@property
def container_options(self) -> Callable[[int], dict]:
"""Return Callable which takes a sequence number and returns container options."""
return lambda sequence: {
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont",
"avoid_negative_ts": "make_non_negative",
"fragment_index": str(sequence),
}

View File

@ -2,12 +2,13 @@
import logging import logging
import os import os
import threading import threading
from typing import List from typing import Deque, List
import av import av
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from .const import RECORDER_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT
from .core import PROVIDERS, IdleTimer, Segment, StreamOutput from .core import PROVIDERS, IdleTimer, Segment, StreamOutput
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -18,28 +19,20 @@ def async_setup_recorder(hass):
"""Only here so Provider Registry works.""" """Only here so Provider Registry works."""
def recorder_save_worker(file_out: str, segments: List[Segment], container_format: str): def recorder_save_worker(file_out: str, segments: Deque[Segment]):
"""Handle saving stream.""" """Handle saving stream."""
if not os.path.exists(os.path.dirname(file_out)): if not os.path.exists(os.path.dirname(file_out)):
os.makedirs(os.path.dirname(file_out), exist_ok=True) os.makedirs(os.path.dirname(file_out), exist_ok=True)
first_pts = {"video": None, "audio": None} pts_adjuster = {"video": None, "audio": None}
output = av.open(file_out, "w", format=container_format) output = None
output_v = None output_v = None
output_a = None output_a = None
# Get first_pts values from first segment last_stream_id = None
if len(segments) > 0: # The running duration of processed segments. Note that this is in av.time_base
segment = segments[0] # units which seem to be defined inversely to how stream time_bases are defined
source = av.open(segment.segment, "r", format=container_format) running_duration = 0
source_v = source.streams.video[0]
first_pts["video"] = source_v.start_time
if len(source.streams.audio) > 0:
source_a = source.streams.audio[0]
first_pts["audio"] = int(
source_v.start_time * source_v.time_base / source_a.time_base
)
source.close()
last_sequence = float("-inf") last_sequence = float("-inf")
for segment in segments: for segment in segments:
@ -50,26 +43,54 @@ def recorder_save_worker(file_out: str, segments: List[Segment], container_forma
last_sequence = segment.sequence last_sequence = segment.sequence
# Open segment # Open segment
source = av.open(segment.segment, "r", format=container_format) source = av.open(segment.segment, "r", format=SEGMENT_CONTAINER_FORMAT)
source_v = source.streams.video[0] source_v = source.streams.video[0]
# Add output streams source_a = source.streams.audio[0] if len(source.streams.audio) > 0 else None
# Create output on first segment
if not output:
output = av.open(
file_out,
"w",
format=RECORDER_CONTAINER_FORMAT,
container_options={
"video_track_timescale": str(int(1 / source_v.time_base))
},
)
# Add output streams if necessary
if not output_v: if not output_v:
output_v = output.add_stream(template=source_v) output_v = output.add_stream(template=source_v)
context = output_v.codec_context context = output_v.codec_context
context.flags |= "GLOBAL_HEADER" context.flags |= "GLOBAL_HEADER"
if not output_a and len(source.streams.audio) > 0: if source_a and not output_a:
source_a = source.streams.audio[0]
output_a = output.add_stream(template=source_a) output_a = output.add_stream(template=source_a)
# Recalculate pts adjustments on first segment and on any discontinuity
# We are assuming time base is the same across all discontinuities
if last_stream_id != segment.stream_id:
last_stream_id = segment.stream_id
pts_adjuster["video"] = int(
(running_duration - source.start_time)
/ (av.time_base * source_v.time_base)
)
if source_a:
pts_adjuster["audio"] = int(
(running_duration - source.start_time)
/ (av.time_base * source_a.time_base)
)
# Remux video # Remux video
for packet in source.demux(): for packet in source.demux():
if packet.dts is None: if packet.dts is None:
continue continue
packet.pts -= first_pts[packet.stream.type] packet.pts += pts_adjuster[packet.stream.type]
packet.dts -= first_pts[packet.stream.type] packet.dts += pts_adjuster[packet.stream.type]
packet.stream = output_v if packet.stream.type == "video" else output_a packet.stream = output_v if packet.stream.type == "video" else output_a
output.mux(packet) output.mux(packet)
running_duration += source.duration - source.start_time
source.close() source.close()
output.close() output.close()
@ -83,33 +104,15 @@ class RecorderOutput(StreamOutput):
"""Initialize recorder output.""" """Initialize recorder output."""
super().__init__(hass, idle_timer) super().__init__(hass, idle_timer)
self.video_path = None self.video_path = None
self._segments = []
@property @property
def name(self) -> str: def name(self) -> str:
"""Return provider name.""" """Return provider name."""
return "recorder" return "recorder"
@property
def format(self) -> str:
"""Return container format."""
return "mp4"
@property
def audio_codecs(self) -> str:
"""Return desired audio codec."""
return {"aac", "mp3"}
@property
def video_codecs(self) -> tuple:
"""Return desired video codecs."""
return {"hevc", "h264"}
def prepend(self, segments: List[Segment]) -> None: def prepend(self, segments: List[Segment]) -> None:
"""Prepend segments to existing list.""" """Prepend segments to existing list."""
own_segments = self.segments self._segments.extendleft(reversed(segments))
segments = [s for s in segments if s.sequence not in own_segments]
self._segments = segments + self._segments
def cleanup(self): def cleanup(self):
"""Write recording and clean up.""" """Write recording and clean up."""
@ -117,9 +120,8 @@ class RecorderOutput(StreamOutput):
thread = threading.Thread( thread = threading.Thread(
name="recorder_save_worker", name="recorder_save_worker",
target=recorder_save_worker, target=recorder_save_worker,
args=(self.video_path, self._segments, self.format), args=(self.video_path, self._segments),
) )
thread.start() thread.start()
super().cleanup() super().cleanup()
self._segments = []

View File

@ -6,10 +6,12 @@ import logging
import av import av
from .const import ( from .const import (
AUDIO_CODECS,
MAX_MISSING_DTS, MAX_MISSING_DTS,
MAX_TIMESTAMP_GAP, MAX_TIMESTAMP_GAP,
MIN_SEGMENT_DURATION, MIN_SEGMENT_DURATION,
PACKETS_TO_WAIT_FOR_AUDIO, PACKETS_TO_WAIT_FOR_AUDIO,
SEGMENT_CONTAINER_FORMAT,
STREAM_TIMEOUT, STREAM_TIMEOUT,
) )
from .core import Segment, StreamBuffer from .core import Segment, StreamBuffer
@ -17,19 +19,20 @@ from .core import Segment, StreamBuffer
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def create_stream_buffer(stream_output, video_stream, audio_stream, sequence): def create_stream_buffer(video_stream, audio_stream, sequence):
"""Create a new StreamBuffer.""" """Create a new StreamBuffer."""
segment = io.BytesIO() segment = io.BytesIO()
container_options = ( container_options = {
stream_output.container_options(sequence) # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
if stream_output.container_options "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont",
else {} "avoid_negative_ts": "disabled",
) "fragment_index": str(sequence),
}
output = av.open( output = av.open(
segment, segment,
mode="w", mode="w",
format=stream_output.format, format=SEGMENT_CONTAINER_FORMAT,
container_options={ container_options={
"video_track_timescale": str(int(1 / video_stream.time_base)), "video_track_timescale": str(int(1 / video_stream.time_base)),
**container_options, **container_options,
@ -38,7 +41,7 @@ def create_stream_buffer(stream_output, video_stream, audio_stream, sequence):
vstream = output.add_stream(template=video_stream) vstream = output.add_stream(template=video_stream)
# Check if audio is requested # Check if audio is requested
astream = None astream = None
if audio_stream and audio_stream.name in stream_output.audio_codecs: if audio_stream and audio_stream.name in AUDIO_CODECS:
astream = output.add_stream(template=audio_stream) astream = output.add_stream(template=audio_stream)
return StreamBuffer(segment, output, vstream, astream) return StreamBuffer(segment, output, vstream, astream)
@ -52,10 +55,11 @@ class SegmentBuffer:
self._video_stream = None self._video_stream = None
self._audio_stream = None self._audio_stream = None
self._outputs_callback = outputs_callback self._outputs_callback = outputs_callback
# tuple of StreamOutput, StreamBuffer # Each element is a StreamOutput
self._outputs = [] self._outputs = []
self._sequence = 0 self._sequence = 0
self._segment_start_pts = None self._segment_start_pts = None
self._stream_buffer = None
def set_streams(self, video_stream, audio_stream): def set_streams(self, video_stream, audio_stream):
"""Initialize output buffer with streams from container.""" """Initialize output buffer with streams from container."""
@ -70,14 +74,10 @@ class SegmentBuffer:
# Fetch the latest StreamOutputs, which may have changed since the # Fetch the latest StreamOutputs, which may have changed since the
# worker started. # worker started.
self._outputs = [] self._outputs = self._outputs_callback().values()
for stream_output in self._outputs_callback().values(): self._stream_buffer = create_stream_buffer(
if self._video_stream.name not in stream_output.video_codecs: self._video_stream, self._audio_stream, self._sequence
continue )
buffer = create_stream_buffer(
stream_output, self._video_stream, self._audio_stream, self._sequence
)
self._outputs.append((buffer, stream_output))
def mux_packet(self, packet): def mux_packet(self, packet):
"""Mux a packet to the appropriate StreamBuffers.""" """Mux a packet to the appropriate StreamBuffers."""
@ -93,22 +93,21 @@ class SegmentBuffer:
self.reset(packet.pts) self.reset(packet.pts)
# Mux the packet # Mux the packet
for (buffer, _) in self._outputs: if packet.stream == self._video_stream:
if packet.stream == self._video_stream: packet.stream = self._stream_buffer.vstream
packet.stream = buffer.vstream self._stream_buffer.output.mux(packet)
elif packet.stream == self._audio_stream: elif packet.stream == self._audio_stream:
packet.stream = buffer.astream packet.stream = self._stream_buffer.astream
else: self._stream_buffer.output.mux(packet)
continue
buffer.output.mux(packet)
def flush(self, duration): def flush(self, duration):
"""Create a segment from the buffered packets and write to output.""" """Create a segment from the buffered packets and write to output."""
for (buffer, stream_output) in self._outputs: self._stream_buffer.output.close()
buffer.output.close() segment = Segment(
stream_output.put( self._sequence, self._stream_buffer.segment, duration, self._stream_id
Segment(self._sequence, buffer.segment, duration, self._stream_id) )
) for stream_output in self._outputs:
stream_output.put(segment)
def discontinuity(self): def discontinuity(self):
"""Mark the stream as having been restarted.""" """Mark the stream as having been restarted."""
@ -118,9 +117,8 @@ class SegmentBuffer:
self._stream_id += 1 self._stream_id += 1
def close(self): def close(self):
"""Close all StreamBuffers.""" """Close stream buffer."""
for (buffer, _) in self._outputs: self._stream_buffer.output.close()
buffer.output.close()
def stream_worker(source, options, segment_buffer, quit_event): def stream_worker(source, options, segment_buffer, quit_event):

View File

@ -174,7 +174,20 @@ async def test_recorder_save(tmpdir):
filename = f"{tmpdir}/test.mp4" filename = f"{tmpdir}/test.mp4"
# Run # Run
recorder_save_worker(filename, [Segment(1, source, 4)], "mp4") recorder_save_worker(filename, [Segment(1, source, 4)])
# Assert
assert os.path.exists(filename)
async def test_recorder_discontinuity(tmpdir):
"""Test recorder save across a discontinuity."""
# Setup
source = generate_h264_video()
filename = f"{tmpdir}/test.mp4"
# Run
recorder_save_worker(filename, [Segment(1, source, 4, 0), Segment(2, source, 4, 1)])
# Assert # Assert
assert os.path.exists(filename) assert os.path.exists(filename)