From 9aaa92f366b5345f2297c43541a6daa2ac0e7ca7 Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Mon, 1 Nov 2021 11:23:01 +0800 Subject: [PATCH] Improve part metadata in stream (#58822) --- homeassistant/components/stream/worker.py | 117 +++++++++++++--------- tests/components/stream/test_worker.py | 52 ++++++---- 2 files changed, 106 insertions(+), 63 deletions(-) diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 64a43f68aa0..881614b04a3 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -66,9 +66,15 @@ class SegmentBuffer: memory_file: BytesIO, sequence: int, input_vstream: av.video.VideoStream, - ) -> av.container.OutputContainer: - """Make a new av OutputContainer.""" - return av.open( + input_astream: av.audio.stream.AudioStream, + ) -> tuple[ + av.container.OutputContainer, + av.video.VideoStream, + av.audio.stream.AudioStream | None, + ]: + """Make a new av OutputContainer and add output streams.""" + add_audio = input_astream and input_astream.name in AUDIO_CODECS + container = av.open( memory_file, mode="w", format=SEGMENT_CONTAINER_FORMAT, @@ -93,19 +99,21 @@ class SegmentBuffer: # Create a fragment every TARGET_PART_DURATION. The data from each fragment is stored in # a "Part" that can be combined with the data from all the other "Part"s, plus an init # section, to reconstitute the data in a "Segment". - # frag_duration is the threshold for determining part boundaries, and the dts of the last - # packet in the part should correspond to a duration that is smaller than this value. - # However, as the part duration includes the duration of the last frame, the part duration - # will be equal to or greater than this value. - # We previously scaled this number down by .85 to account for this while keeping within - # the 15% variance allowed in part duration. However, this did not work when inputs had - # an audio stream - sometimes the fragment would get cut on the audio packet, causing - # the durations to actually be to short. - # The current approach is to use this frag_duration for creating the media while - # adjusting the metadata duration to keep the durations in the metadata below the - # part_target_duration threshold. + # The LL-HLS spec allows for a fragment's duration to be within the range [0.85x,1.0x] + # of the part target duration. We use the frag_duration option to tell ffmpeg to try to + # cut the fragments when they reach frag_duration. However, the resulting fragments can + # have variability in their durations and can end up being too short or too long. If + # there are two tracks, as in the case of a video feed with audio, the fragment cut seems + # to be done on the first track that crosses the desired threshold, and cutting on the + # audio track may result in a shorter video fragment than desired. Conversely, with a + # video track with no audio, the discrete nature of frames means that the frame at the + # end of a fragment will sometimes extend slightly beyond the desired frag_duration. + # Given this, our approach is to use a frag_duration near the upper end of the range for + # outputs with audio using a frag_duration at the lower end of the range for outputs with + # only video. "frag_duration": str( - self._stream_settings.part_target_duration * 1e6 + self._stream_settings.part_target_duration + * (98e4 if add_audio else 9e5) ), } if self._stream_settings.ll_hls @@ -113,6 +121,12 @@ class SegmentBuffer: ), }, ) + output_vstream = container.add_stream(template=input_vstream) + # Check if audio is requested + output_astream = None + if add_audio: + output_astream = container.add_stream(template=input_astream) + return container, output_vstream, output_astream def set_streams( self, @@ -128,26 +142,22 @@ class SegmentBuffer: """Initialize a new stream segment.""" # Keep track of the number of segments we've processed self._sequence += 1 - self._segment_start_dts = video_dts + self._part_start_dts = self._segment_start_dts = video_dts self._segment = None self._memory_file = BytesIO() self._memory_file_pos = 0 - self._av_output = self.make_new_av( + ( + self._av_output, + self._output_video_stream, + self._output_audio_stream, + ) = self.make_new_av( memory_file=self._memory_file, sequence=self._sequence, input_vstream=self._input_video_stream, - ) - self._output_video_stream = self._av_output.add_stream( - template=self._input_video_stream + input_astream=self._input_audio_stream, ) if self._output_video_stream.name == "hevc": self._output_video_stream.codec_tag = "hvc1" - # Check if audio is requested - self._output_audio_stream = None - if self._input_audio_stream and self._input_audio_stream.name in AUDIO_CODECS: - self._output_audio_stream = self._av_output.add_stream( - template=self._input_audio_stream - ) def mux_packet(self, packet: av.Packet) -> None: """Mux a packet to the appropriate output stream.""" @@ -186,13 +196,9 @@ class SegmentBuffer: # Fetch the latest StreamOutputs, which may have changed since the # worker started. stream_outputs=self._outputs_callback().values(), - start_time=self._start_time - + datetime.timedelta( - seconds=float(self._segment_start_dts * packet.time_base) - ), + start_time=self._start_time, ) self._memory_file_pos = self._memory_file.tell() - self._part_start_dts = self._segment_start_dts else: # These are the ends of the part segments self.flush(packet, last_part=False) @@ -201,17 +207,23 @@ class SegmentBuffer: If last_part is True, also close the segment, give it a duration, and clean up the av_output and memory_file. + There are two different ways to enter this function, and when + last_part is True, packet has not yet been muxed, while when + last_part is False, the packet has already been muxed. However, + in both cases, packet is the next packet and is not included in + the Part. + This function writes the duration metadata for the Part and + for the Segment. However, as the fragmentation done by ffmpeg + may result in fragment durations which fall outside the + [0.85x,1.0x] tolerance band allowed by LL-HLS, we need to fudge + some durations a bit by reporting them as being within that + range. + Note that repeated adjustments may cause drift between the part + durations in the metadata and those in the media and result in + playback issues in some clients. """ - # In some cases using the current packet's dts (which is the start - # dts of the next part) to calculate the part duration will result in a - # value which exceeds the part_target_duration. This can muck up the - # duration of both this part and the next part. An easy fix is to just - # use the current packet dts and cap it by the part target duration. - # The adjustment may cause a drift between this adjusted duration - # (used in the metadata) and the media duration, but the drift should be - # automatically corrected when the part duration cleanly divides the - # framerate. - current_dts = min( + # Part durations should not exceed the part target duration + adjusted_dts = min( packet.dts, self._part_start_dts + self._stream_settings.part_target_duration / packet.time_base, @@ -220,29 +232,44 @@ class SegmentBuffer: # Closing the av_output will write the remaining buffered data to the # memory_file as a new moof/mdat. self._av_output.close() + elif not self._part_has_keyframe: + # Parts which are not the last part or an independent part should + # not have durations below 0.85 of the part target duration. + adjusted_dts = max( + adjusted_dts, + self._part_start_dts + + 0.85 * self._stream_settings.part_target_duration / packet.time_base, + ) assert self._segment self._memory_file.seek(self._memory_file_pos) self._hass.loop.call_soon_threadsafe( self._segment.async_add_part, Part( - duration=float((current_dts - self._part_start_dts) * packet.time_base), + duration=float( + (adjusted_dts - self._part_start_dts) * packet.time_base + ), has_keyframe=self._part_has_keyframe, data=self._memory_file.read(), ), - float((current_dts - self._segment_start_dts) * packet.time_base) + ( + segment_duration := float( + (adjusted_dts - self._segment_start_dts) * packet.time_base + ) + ) if last_part else 0, ) if last_part: # If we've written the last part, we can close the memory_file. self._memory_file.close() # We don't need the BytesIO object anymore + self._start_time += datetime.timedelta(seconds=segment_duration) # Reinitialize - self.reset(current_dts) + self.reset(packet.dts) else: # For the last part, these will get set again elsewhere so we can skip # setting them here. self._memory_file_pos = self._memory_file.tell() - self._part_start_dts = current_dts + self._part_start_dts = adjusted_dts self._part_has_keyframe = False def discontinuity(self) -> None: diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 7c9ad91f543..97fe4bd0d37 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -677,6 +677,10 @@ async def test_worker_log(hass, caplog): async def test_durations(hass, record_worker_sync): """Test that the duration metadata matches the media.""" + + # Use a target part duration which has a slight mismatch + # with the incoming frame rate to better expose problems. + target_part_duration = TEST_PART_DURATION - 0.01 await async_setup_component( hass, "stream", @@ -684,12 +688,12 @@ async def test_durations(hass, record_worker_sync): "stream": { CONF_LL_HLS: True, CONF_SEGMENT_DURATION: SEGMENT_DURATION, - CONF_PART_DURATION: TEST_PART_DURATION, + CONF_PART_DURATION: target_part_duration, } }, ) - source = generate_h264_video() + source = generate_h264_video(duration=SEGMENT_DURATION + 1) stream = create_stream(hass, source, {}) # use record_worker_sync to grab output segments @@ -702,25 +706,37 @@ async def test_durations(hass, record_worker_sync): # check that the Part duration metadata matches the durations in the media running_metadata_duration = 0 for segment in complete_segments: - for part in segment.parts: + av_segment = av.open(io.BytesIO(segment.init + segment.get_data())) + av_segment.close() + for part_num, part in enumerate(segment.parts): av_part = av.open(io.BytesIO(segment.init + part.data)) running_metadata_duration += part.duration - # av_part.duration actually returns the dts of the first packet of - # the next av_part. When we normalize this by av.time_base we get - # the running duration of the media. - # The metadata duration is slightly different. The worker has - # some flexibility of where to set each metadata boundary, and - # when the media's duration is slightly too long, the metadata - # duration is adjusted down. This means that the running metadata - # duration may be up to one video frame duration smaller than the - # part duration. - assert running_metadata_duration < av_part.duration / av.time_base + 1e-6 - assert ( - running_metadata_duration - > av_part.duration / av.time_base - - 1 / av_part.streams.video[0].rate - - 1e-6 + # av_part.duration actually returns the dts of the first packet of the next + # av_part. When we normalize this by av.time_base we get the running + # duration of the media. + # The metadata duration may differ slightly from the media duration. + # The worker has some flexibility of where to set each metadata boundary, + # and when the media's duration is slightly too long or too short, the + # metadata duration may be adjusted up or down. + # We check here that the divergence between the metadata duration and the + # media duration is not too large (2 frames seems reasonable here). + assert math.isclose( + (av_part.duration - av_part.start_time) / av.time_base, + part.duration, + abs_tol=2 / av_part.streams.video[0].rate + 1e-6, ) + # Also check that the sum of the durations so far matches the last dts + # in the media. + assert math.isclose( + running_metadata_duration, + av_part.duration / av.time_base, + abs_tol=1e-6, + ) + # And check that the metadata duration is between 0.85x and 1.0x of + # the part target duration + if not (part.has_keyframe or part_num == len(segment.parts) - 1): + assert part.duration > 0.85 * target_part_duration - 1e-6 + assert part.duration < target_part_duration + 1e-6 av_part.close() # check that the Part durations are consistent with the Segment durations for segment in complete_segments: