Fix playback of hls cameras in stream (#75166)

This commit is contained in:
uvjustin 2022-07-15 03:24:24 +08:00 committed by GitHub
parent 6184f0557d
commit 75892385bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 41 deletions

View File

@ -57,9 +57,15 @@ from .const import (
SOURCE_TIMEOUT, SOURCE_TIMEOUT,
STREAM_RESTART_INCREMENT, STREAM_RESTART_INCREMENT,
STREAM_RESTART_RESET_TIME, STREAM_RESTART_RESET_TIME,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
) )
from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings from .core import (
PROVIDERS,
STREAM_SETTINGS_NON_LL_HLS,
IdleTimer,
KeyFrameConverter,
StreamOutput,
StreamSettings,
)
from .diagnostics import Diagnostics from .diagnostics import Diagnostics
from .hls import HlsStreamOutput, async_setup_hls from .hls import HlsStreamOutput, async_setup_hls
@ -224,14 +230,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hls_part_timeout=2 * conf[CONF_PART_DURATION], hls_part_timeout=2 * conf[CONF_PART_DURATION],
) )
else: else:
hass.data[DOMAIN][ATTR_SETTINGS] = StreamSettings( hass.data[DOMAIN][ATTR_SETTINGS] = STREAM_SETTINGS_NON_LL_HLS
ll_hls=False,
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS
- SEGMENT_DURATION_ADJUSTER,
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
hls_advance_part_limit=3,
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
# Setup HLS # Setup HLS
hls_endpoint = async_setup_hls(hass) hls_endpoint = async_setup_hls(hass)

View File

@ -5,6 +5,7 @@ import asyncio
from collections import deque from collections import deque
from collections.abc import Callable, Coroutine, Iterable from collections.abc import Callable, Coroutine, Iterable
import datetime import datetime
import logging
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from aiohttp import web from aiohttp import web
@ -16,13 +17,20 @@ from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.event import async_call_later from homeassistant.helpers.event import async_call_later
from homeassistant.util.decorator import Registry from homeassistant.util.decorator import Registry
from .const import ATTR_STREAMS, DOMAIN from .const import (
ATTR_STREAMS,
DOMAIN,
SEGMENT_DURATION_ADJUSTER,
TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from av import CodecContext, Packet from av import CodecContext, Packet
from . import Stream from . import Stream
_LOGGER = logging.getLogger(__name__)
PROVIDERS: Registry[str, type[StreamOutput]] = Registry() PROVIDERS: Registry[str, type[StreamOutput]] = Registry()
@ -37,6 +45,15 @@ class StreamSettings:
hls_part_timeout: float = attr.ib() hls_part_timeout: float = attr.ib()
STREAM_SETTINGS_NON_LL_HLS = StreamSettings(
ll_hls=False,
min_segment_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS - SEGMENT_DURATION_ADJUSTER,
part_target_duration=TARGET_SEGMENT_DURATION_NON_LL_HLS,
hls_advance_part_limit=3,
hls_part_timeout=TARGET_SEGMENT_DURATION_NON_LL_HLS,
)
@attr.s(slots=True) @attr.s(slots=True)
class Part: class Part:
"""Represent a segment part.""" """Represent a segment part."""
@ -426,12 +443,22 @@ class KeyFrameConverter:
return return
packet = self.packet packet = self.packet
self.packet = None self.packet = None
for _ in range(2): # Retry once if codec context needs to be flushed
try:
# decode packet (flush afterwards) # decode packet (flush afterwards)
frames = self._codec_context.decode(packet) frames = self._codec_context.decode(packet)
for _i in range(2): for _i in range(2):
if frames: if frames:
break break
frames = self._codec_context.decode(None) frames = self._codec_context.decode(None)
break
except EOFError:
_LOGGER.debug("Codec context needs flushing, attempting to reopen")
self._codec_context.close()
self._codec_context.open()
else:
_LOGGER.debug("Unable to decode keyframe")
return
if frames: if frames:
frame = frames[0] frame = frames[0]
if width and height: if width and height:

View File

@ -2,6 +2,10 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Generator from collections.abc import Generator
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from io import BytesIO
def find_box( def find_box(
@ -135,3 +139,11 @@ def get_codec_string(mp4_bytes: bytes) -> str:
codecs.append(codec) codecs.append(codec)
return ",".join(codecs) return ",".join(codecs)
def read_init(bytes_io: BytesIO) -> bytes:
"""Read the init from a mp4 file."""
bytes_io.seek(24)
moov_len = int.from_bytes(bytes_io.read(4), byteorder="big")
bytes_io.seek(0)
return bytes_io.read(24 + moov_len)

View File

@ -5,11 +5,12 @@ from collections import defaultdict, deque
from collections.abc import Callable, Generator, Iterator, Mapping from collections.abc import Callable, Generator, Iterator, Mapping
import contextlib import contextlib
import datetime import datetime
from io import BytesIO from io import SEEK_END, BytesIO
import logging import logging
from threading import Event from threading import Event
from typing import Any, cast from typing import Any, cast
import attr
import av import av
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@ -24,8 +25,16 @@ from .const import (
SEGMENT_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT,
SOURCE_TIMEOUT, SOURCE_TIMEOUT,
) )
from .core import KeyFrameConverter, Part, Segment, StreamOutput, StreamSettings from .core import (
STREAM_SETTINGS_NON_LL_HLS,
KeyFrameConverter,
Part,
Segment,
StreamOutput,
StreamSettings,
)
from .diagnostics import Diagnostics from .diagnostics import Diagnostics
from .fmp4utils import read_init
from .hls import HlsStreamOutput from .hls import HlsStreamOutput
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -108,7 +117,7 @@ class StreamMuxer:
hass: HomeAssistant, hass: HomeAssistant,
video_stream: av.video.VideoStream, video_stream: av.video.VideoStream,
audio_stream: av.audio.stream.AudioStream | None, audio_stream: av.audio.stream.AudioStream | None,
audio_bsf: av.BitStreamFilterContext | None, audio_bsf: av.BitStreamFilter | None,
stream_state: StreamState, stream_state: StreamState,
stream_settings: StreamSettings, stream_settings: StreamSettings,
) -> None: ) -> None:
@ -120,6 +129,7 @@ class StreamMuxer:
self._input_video_stream: av.video.VideoStream = video_stream self._input_video_stream: av.video.VideoStream = video_stream
self._input_audio_stream: av.audio.stream.AudioStream | None = audio_stream self._input_audio_stream: av.audio.stream.AudioStream | None = audio_stream
self._audio_bsf = audio_bsf self._audio_bsf = audio_bsf
self._audio_bsf_context: av.BitStreamFilterContext = None
self._output_video_stream: av.video.VideoStream = None self._output_video_stream: av.video.VideoStream = None
self._output_audio_stream: av.audio.stream.AudioStream | None = None self._output_audio_stream: av.audio.stream.AudioStream | None = None
self._segment: Segment | None = None self._segment: Segment | None = None
@ -151,7 +161,7 @@ class StreamMuxer:
**{ **{
# Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970 # Removed skip_sidx - see https://github.com/home-assistant/core/pull/39970
# "cmaf" flag replaces several of the movflags used, but too recent to use for now # "cmaf" flag replaces several of the movflags used, but too recent to use for now
"movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer", "movflags": "frag_custom+empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
# Sometimes the first segment begins with negative timestamps, and this setting just # Sometimes the first segment begins with negative timestamps, and this setting just
# adjusts the timestamps in the output from that segment to start from 0. Helps from # adjusts the timestamps in the output from that segment to start from 0. Helps from
# having to make some adjustments in test_durations # having to make some adjustments in test_durations
@ -164,7 +174,7 @@ class StreamMuxer:
# Fragment durations may exceed the 15% allowed variance but it seems ok # Fragment durations may exceed the 15% allowed variance but it seems ok
**( **(
{ {
"movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer", "movflags": "empty_moov+default_base_moof+frag_discont+negative_cts_offsets+skip_trailer+delay_moov",
# Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in
# a "Part" that can be combined with the data from all the other "Part"s, plus an init # a "Part" that can be combined with the data from all the other "Part"s, plus an init
# section, to reconstitute the data in a "Segment". # section, to reconstitute the data in a "Segment".
@ -194,8 +204,11 @@ class StreamMuxer:
# Check if audio is requested # Check if audio is requested
output_astream = None output_astream = None
if input_astream: if input_astream:
if self._audio_bsf:
self._audio_bsf_context = self._audio_bsf.create()
self._audio_bsf_context.set_input_stream(input_astream)
output_astream = container.add_stream( output_astream = container.add_stream(
template=self._audio_bsf or input_astream template=self._audio_bsf_context or input_astream
) )
return container, output_vstream, output_astream return container, output_vstream, output_astream
@ -238,15 +251,29 @@ class StreamMuxer:
self._part_has_keyframe |= packet.is_keyframe self._part_has_keyframe |= packet.is_keyframe
elif packet.stream == self._input_audio_stream: elif packet.stream == self._input_audio_stream:
if self._audio_bsf: if self._audio_bsf_context:
self._audio_bsf.send(packet) self._audio_bsf_context.send(packet)
while packet := self._audio_bsf.recv(): while packet := self._audio_bsf_context.recv():
packet.stream = self._output_audio_stream packet.stream = self._output_audio_stream
self._av_output.mux(packet) self._av_output.mux(packet)
return return
packet.stream = self._output_audio_stream packet.stream = self._output_audio_stream
self._av_output.mux(packet) self._av_output.mux(packet)
def create_segment(self) -> None:
"""Create a segment when the moov is ready."""
self._segment = Segment(
sequence=self._stream_state.sequence,
stream_id=self._stream_state.stream_id,
init=read_init(self._memory_file),
# Fetch the latest StreamOutputs, which may have changed since the
# worker started.
stream_outputs=self._stream_state.outputs,
start_time=self._start_time,
)
self._memory_file_pos = self._memory_file.tell()
self._memory_file.seek(0, SEEK_END)
def check_flush_part(self, packet: av.Packet) -> None: def check_flush_part(self, packet: av.Packet) -> None:
"""Check for and mark a part segment boundary and record its duration.""" """Check for and mark a part segment boundary and record its duration."""
if self._memory_file_pos == self._memory_file.tell(): if self._memory_file_pos == self._memory_file.tell():
@ -254,16 +281,10 @@ class StreamMuxer:
if self._segment is None: if self._segment is None:
# We have our first non-zero byte position. This means the init has just # We have our first non-zero byte position. This means the init has just
# been written. Create a Segment and put it to the queue of each output. # been written. Create a Segment and put it to the queue of each output.
self._segment = Segment( self.create_segment()
sequence=self._stream_state.sequence, # When using delay_moov, the moov is not written until a moof is also ready
stream_id=self._stream_state.stream_id, # Flush the moof
init=self._memory_file.getvalue(), self.flush(packet, last_part=False)
# Fetch the latest StreamOutputs, which may have changed since the
# worker started.
stream_outputs=self._stream_state.outputs,
start_time=self._start_time,
)
self._memory_file_pos = self._memory_file.tell()
else: # These are the ends of the part segments else: # These are the ends of the part segments
self.flush(packet, last_part=False) self.flush(packet, last_part=False)
@ -297,6 +318,10 @@ class StreamMuxer:
# Closing the av_output will write the remaining buffered data to the # Closing the av_output will write the remaining buffered data to the
# memory_file as a new moof/mdat. # memory_file as a new moof/mdat.
self._av_output.close() self._av_output.close()
# With delay_moov, this may be the first time the file pointer has
# moved, so the segment may not yet have been created
if not self._segment:
self.create_segment()
elif not self._part_has_keyframe: elif not self._part_has_keyframe:
# Parts which are not the last part or an independent part should # Parts which are not the last part or an independent part should
# not have durations below 0.85 of the part target duration. # not have durations below 0.85 of the part target duration.
@ -305,6 +330,9 @@ class StreamMuxer:
self._part_start_dts self._part_start_dts
+ 0.85 * self._stream_settings.part_target_duration / packet.time_base, + 0.85 * self._stream_settings.part_target_duration / packet.time_base,
) )
# Undo dts adjustments if we don't have ll_hls
if not self._stream_settings.ll_hls:
adjusted_dts = packet.dts
assert self._segment assert self._segment
self._memory_file.seek(self._memory_file_pos) self._memory_file.seek(self._memory_file_pos)
self._hass.loop.call_soon_threadsafe( self._hass.loop.call_soon_threadsafe(
@ -445,10 +473,7 @@ def get_audio_bitstream_filter(
_LOGGER.debug( _LOGGER.debug(
"ADTS AAC detected. Adding aac_adtstoaac bitstream filter" "ADTS AAC detected. Adding aac_adtstoaac bitstream filter"
) )
bsf = av.BitStreamFilter("aac_adtstoasc") return av.BitStreamFilter("aac_adtstoasc")
bsf_context = bsf.create()
bsf_context.set_input_stream(audio_stream)
return bsf_context
break break
return None return None
@ -489,7 +514,12 @@ def stream_worker(
audio_stream = None audio_stream = None
# Disable ll-hls for hls inputs # Disable ll-hls for hls inputs
if container.format.name == "hls": if container.format.name == "hls":
stream_settings.ll_hls = False for field in attr.fields(StreamSettings):
setattr(
stream_settings,
field.name,
getattr(STREAM_SETTINGS_NON_LL_HLS, field.name),
)
stream_state.diagnostics.set_value("container_format", container.format.name) stream_state.diagnostics.set_value("container_format", container.format.name)
stream_state.diagnostics.set_value("video_codec", video_stream.name) stream_state.diagnostics.set_value("video_codec", video_stream.name)
if audio_stream: if audio_stream:

View File

@ -755,7 +755,9 @@ async def test_durations(hass, worker_finished_stream):
}, },
) )
source = generate_h264_video(duration=SEGMENT_DURATION + 1) source = generate_h264_video(
duration=round(SEGMENT_DURATION + target_part_duration + 1)
)
worker_finished, mock_stream = worker_finished_stream worker_finished, mock_stream = worker_finished_stream
with patch("homeassistant.components.stream.Stream", wraps=mock_stream): with patch("homeassistant.components.stream.Stream", wraps=mock_stream):