Fix AVError and allow more missing DTS packets in stream (#42277)

* Fix AVError and allow more missing DTS in stream init

* Avoid recreating container demux iterator

* Relax missing dts requirement

* Fix spelling error

* Fix error message count

* Add timeout to av.open read

* Increase STREAM_TIMEOUT to 30
This commit is contained in:
uvjustin 2020-10-25 10:55:12 +08:00 committed by GitHub
parent 09bd449698
commit f8d87bff16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 23 deletions

View File

@ -20,3 +20,6 @@ MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds
PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio
MAX_TIMESTAMP_GAP = 10000 # seconds - anything from 10 to 50000 is probably reasonable
MAX_MISSING_DTS = 6 # Number of packets missing DTS to allow
STREAM_TIMEOUT = 30 # Timeout for reading stream

View File

@ -6,7 +6,13 @@ import time
import av
from .const import MAX_TIMESTAMP_GAP, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
from .const import (
MAX_MISSING_DTS,
MAX_TIMESTAMP_GAP,
MIN_SEGMENT_DURATION,
PACKETS_TO_WAIT_FOR_AUDIO,
STREAM_TIMEOUT,
)
from .core import Segment, StreamBuffer
_LOGGER = logging.getLogger(__name__)
@ -62,7 +68,7 @@ def stream_worker(hass, stream, quit_event):
def _stream_worker_internal(hass, stream, quit_event):
"""Handle consuming streams."""
container = av.open(stream.source, options=stream.options)
container = av.open(stream.source, options=stream.options, timeout=STREAM_TIMEOUT)
try:
video_stream = container.streams.video[0]
except (KeyError, IndexError):
@ -81,13 +87,15 @@ def _stream_worker_internal(hass, stream, quit_event):
if audio_stream and audio_stream.profile is None:
audio_stream = None
# Iterator for demuxing
container_packets = None
# The presentation timestamps of the first packet in each stream we receive
# Use to adjust before muxing or outputting, but we don't adjust internally
first_pts = {}
# The decoder timestamps of the latest packet in each stream we processed
last_dts = None
# Keep track of consecutive packets without a dts to detect end of stream.
last_packet_was_without_dts = False
missing_dts = 0
# Holds the buffers for each stream provider
outputs = None
# Keep track of the number of segments we've processed
@ -102,8 +110,8 @@ def _stream_worker_internal(hass, stream, quit_event):
# 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815
def peek_first_pts():
nonlocal first_pts, audio_stream
missing_dts = False
nonlocal first_pts, audio_stream, container_packets
missing_dts = 0
def empty_stream_dict():
return {
@ -112,17 +120,20 @@ def _stream_worker_internal(hass, stream, quit_event):
}
try:
container_packets = container.demux((video_stream, audio_stream))
first_packet = empty_stream_dict()
first_pts = empty_stream_dict()
# Get to first video keyframe
while first_packet[video_stream] is None:
packet = next(container.demux())
packet = next(container_packets)
if (
packet.dts is None
): # Allow single packet with no dts, raise error on second
if missing_dts:
raise av.AVError
missing_dts = True
): # Allow MAX_MISSING_DTS packets with no dts, raise error on the next one
if missing_dts >= MAX_MISSING_DTS:
raise StopIteration(
f"Invalid data - got {MAX_MISSING_DTS+1} packets with missing DTS while initializing"
)
missing_dts += 1
continue
if packet.stream == video_stream and packet.is_keyframe:
first_packet[video_stream] = packet
@ -131,13 +142,15 @@ def _stream_worker_internal(hass, stream, quit_event):
while any(
[pts is None for pts in {**first_packet, **first_pts}.values()]
) and (len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO):
packet = next(container.demux((video_stream, audio_stream)))
packet = next(container_packets)
if (
packet.dts is None
): # Allow single packet with no dts, raise error on second
if missing_dts:
raise av.AVError
missing_dts = True
): # Allow MAX_MISSING_DTS packet with no dts, raise error on the next one
if missing_dts >= MAX_MISSING_DTS:
raise StopIteration(
f"Invalid data - got {MAX_MISSING_DTS+1} packets with missing DTS while initializing"
)
missing_dts += 1
continue
if (
first_packet[packet.stream] is None
@ -223,16 +236,16 @@ def _stream_worker_internal(hass, stream, quit_event):
if len(initial_packets) > 0:
packet = initial_packets.popleft()
else:
packet = next(container.demux((video_stream, audio_stream)))
packet = next(container_packets)
if packet.dts is None:
_LOGGER.error("Stream packet without dts detected, skipping...")
# Allow a single packet without dts before terminating the stream.
if last_packet_was_without_dts:
# If we get a "flushing" packet, the stream is done
raise StopIteration("No dts in consecutive packets")
last_packet_was_without_dts = True
# Allow MAX_MISSING_DTS consecutive packets without dts. Terminate the stream on the next one.
if missing_dts >= MAX_MISSING_DTS:
raise StopIteration(
f"No dts in {MAX_MISSING_DTS+1} consecutive packets"
)
missing_dts += 1
continue
last_packet_was_without_dts = False
missing_dts = 0
except (av.AVError, StopIteration) as ex:
_LOGGER.error("Error demuxing stream: %s", str(ex))
finalize_stream()