From dc2d0b92973a16e28438fa004d53824d43c43acf Mon Sep 17 00:00:00 2001 From: uvjustin <46082645+uvjustin@users.noreply.github.com> Date: Thu, 20 Aug 2020 11:18:54 +0800 Subject: [PATCH] Add audio to stream (#38846) * Add audio to stream component * Use container options to do most fmp4 formatting * Add test for treatment of different audio inputs * Add test for treatment of different audio inputs * pcm_mulaw frames should be s16 * Use seek to get BytesIO length * Remove unused utcnow * Remove peek_next_audio_pts * only demux audio and video packets - ignoring data and subtitle streams Co-authored-by: Jason Hunter --- homeassistant/components/stream/const.py | 4 +- homeassistant/components/stream/core.py | 11 +- homeassistant/components/stream/fmp4utils.py | 18 +- homeassistant/components/stream/hls.py | 35 ++- homeassistant/components/stream/recorder.py | 31 +- homeassistant/components/stream/worker.py | 292 ++++++++++--------- tests/components/stream/common.py | 51 +++- tests/components/stream/test_recorder.py | 41 ++- 8 files changed, 287 insertions(+), 196 deletions(-) diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 7bc900c25e6..91c4018d899 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -15,7 +15,7 @@ OUTPUT_FORMATS = ["hls"] FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"} -AUDIO_SAMPLE_RATE = 44100 - MAX_SEGMENTS = 3 # Max number of segments to keep around MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds + +PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 06e1e659202..5e4e85ceea6 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -2,7 +2,7 @@ import asyncio from collections import deque import io -from typing import Any, List +from typing import Any, Callable, List from aiohttp import web import attr @@ -34,7 +34,6 @@ class Segment: sequence: int = attr.ib() segment: io.BytesIO = attr.ib() duration: float = attr.ib() - start_pts: tuple = attr.ib() class StreamOutput: @@ -61,8 +60,8 @@ class StreamOutput: return None @property - def audio_codec(self) -> str: - """Return desired audio codec.""" + def audio_codecs(self) -> str: + """Return desired audio codecs.""" return None @property @@ -71,8 +70,8 @@ class StreamOutput: return None @property - def container_options(self) -> dict: - """Return container options.""" + def container_options(self) -> Callable[[int], dict]: + """Return Callable which takes a sequence number and returns container options.""" return None @property diff --git a/homeassistant/components/stream/fmp4utils.py b/homeassistant/components/stream/fmp4utils.py index 025e576bc06..00603807215 100644 --- a/homeassistant/components/stream/fmp4utils.py +++ b/homeassistant/components/stream/fmp4utils.py @@ -5,7 +5,8 @@ import io def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int: """Find location of first box (or sub_box if box_start provided) of given type.""" if box_start == 0: - box_end = len(segment.getbuffer()) + box_end = segment.seek(0, io.SEEK_END) + segment.seek(0) index = 0 else: segment.seek(box_start) @@ -29,22 +30,9 @@ def get_init(segment: io.BytesIO) -> bytes: return segment.read(moof_location) -def get_m4s(segment: io.BytesIO, start_pts: tuple, sequence: int) -> bytes: +def get_m4s(segment: io.BytesIO, sequence: int) -> bytes: """Get m4s section from fragmented mp4.""" moof_location = next(find_box(segment, b"moof")) mfra_location = next(find_box(segment, b"mfra")) - # adjust mfhd sequence number in moof - view = segment.getbuffer() - view[moof_location + 20 : moof_location + 24] = sequence.to_bytes(4, "big") - # adjust tfdt in video traf - traf_finder = find_box(segment, b"traf", moof_location) - traf_location = next(traf_finder) - tfdt_location = next(find_box(segment, b"tfdt", traf_location)) - view[tfdt_location + 12 : tfdt_location + 20] = start_pts[0].to_bytes(8, "big") - # adjust tfdt in audio traf - traf_location = next(traf_finder) - tfdt_location = next(find_box(segment, b"tfdt", traf_location)) - view[tfdt_location + 12 : tfdt_location + 20] = start_pts[1].to_bytes(8, "big") - # done adjusting segment.seek(moof_location) return segment.read(mfra_location - moof_location) diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 66cf3583b60..bd7a6966c9d 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -1,8 +1,9 @@ """Provide functionality to stream HLS.""" +from typing import Callable + from aiohttp import web from homeassistant.core import callback -from homeassistant.util.dt import utcnow from .const import FORMAT_CONTENT_TYPE from .core import PROVIDERS, StreamOutput, StreamView @@ -35,7 +36,7 @@ class HlsPlaylistView(StreamView): await track.recv() headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]} return web.Response( - body=renderer.render(track, utcnow()).encode("utf-8"), headers=headers + body=renderer.render(track).encode("utf-8"), headers=headers ) @@ -71,8 +72,7 @@ class HlsSegmentView(StreamView): return web.HTTPNotFound() headers = {"Content-Type": "video/iso.segment"} return web.Response( - body=get_m4s(segment.segment, segment.start_pts, int(sequence)), - headers=headers, + body=get_m4s(segment.segment, int(sequence)), headers=headers, ) @@ -90,11 +90,10 @@ class M3U8Renderer: "#EXT-X-VERSION:7", f"#EXT-X-TARGETDURATION:{track.target_duration}", '#EXT-X-MAP:URI="init.mp4"', - "#EXT-X-INDEPENDENT-SEGMENTS", ] @staticmethod - def render_playlist(track, start_time): + def render_playlist(track): """Render playlist.""" segments = track.segments @@ -114,13 +113,9 @@ class M3U8Renderer: return playlist - def render(self, track, start_time): + def render(self, track): """Render M3U8 file.""" - lines = ( - ["#EXTM3U"] - + self.render_preamble(track) - + self.render_playlist(track, start_time) - ) + lines = ["#EXTM3U"] + self.render_preamble(track) + self.render_playlist(track) return "\n".join(lines) + "\n" @@ -139,9 +134,9 @@ class HlsStreamOutput(StreamOutput): return "mp4" @property - def audio_codec(self) -> str: - """Return desired audio codec.""" - return "aac" + def audio_codecs(self) -> str: + """Return desired audio codecs.""" + return {"aac", "ac3", "mp3"} @property def video_codecs(self) -> tuple: @@ -149,6 +144,10 @@ class HlsStreamOutput(StreamOutput): return {"hevc", "h264"} @property - def container_options(self) -> dict: - """Return container options.""" - return {"movflags": "frag_custom+empty_moov+default_base_moof"} + def container_options(self) -> Callable[[int], dict]: + """Return Callable which takes a sequence number and returns container options.""" + return lambda sequence: { + "movflags": "frag_custom+empty_moov+default_base_moof+skip_sidx+frag_discont", + "avoid_negative_ts": "make_non_negative", + "fragment_index": str(sequence), + } diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index 90d6cdccfc1..0a93ea0bc92 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -15,16 +15,17 @@ def async_setup_recorder(hass): """Only here so Provider Registry works.""" -def recorder_save_worker(file_out: str, segments: List[Segment]): +def recorder_save_worker(file_out: str, segments: List[Segment], container_format: str): """Handle saving stream.""" - first_pts = segments[0].start_pts[0] - output = av.open(file_out, "w") + first_pts = {"video": None, "audio": None} + output = av.open(file_out, "w", format=container_format) output_v = None + output_a = None for segment in segments: # Seek to beginning and open segment segment.segment.seek(0) - source = av.open(segment.segment, "r", format="mp4") + source = av.open(segment.segment, "r", format=container_format) source_v = source.streams.video[0] # Add output streams @@ -32,16 +33,18 @@ def recorder_save_worker(file_out: str, segments: List[Segment]): output_v = output.add_stream(template=source_v) context = output_v.codec_context context.flags |= "GLOBAL_HEADER" + if not output_a and len(source.streams.audio) > 0: + source_a = source.streams.audio[0] + output_a = output.add_stream(template=source_a) # Remux video - for packet in source.demux(source_v): + for packet in source.demux(): if packet is not None and packet.dts is not None: - if packet.pts < segment.start_pts[0]: - packet.pts += segment.start_pts[0] - packet.dts += segment.start_pts[0] - packet.pts -= first_pts - packet.dts -= first_pts - packet.stream = output_v + if first_pts[packet.stream.type] is None: + first_pts[packet.stream.type] = packet.pts + packet.pts -= first_pts[packet.stream.type] + packet.dts -= first_pts[packet.stream.type] + packet.stream = output_v if packet.stream.type == "video" else output_a output.mux(packet) source.close() @@ -70,9 +73,9 @@ class RecorderOutput(StreamOutput): return "mp4" @property - def audio_codec(self) -> str: + def audio_codecs(self) -> str: """Return desired audio codec.""" - return "aac" + return {"aac", "ac3", "mp3"} @property def video_codecs(self) -> tuple: @@ -96,7 +99,7 @@ class RecorderOutput(StreamOutput): thread = threading.Thread( name="recorder_save_worker", target=recorder_save_worker, - args=(self.video_path, self._segments), + args=(self.video_path, self._segments, self.format), ) thread.start() diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 461d29de421..bed8b25dbd1 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -1,54 +1,37 @@ """Provides the worker thread needed for processing streams.""" -from fractions import Fraction +from collections import deque import io import logging import av -from .const import AUDIO_SAMPLE_RATE, MIN_SEGMENT_DURATION +from .const import MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO from .core import Segment, StreamBuffer _LOGGER = logging.getLogger(__name__) -def generate_audio_frame(): - """Generate a blank audio frame.""" - - audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024) - # audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00' - # for i in range(0, 1024)) - audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024 - audio_frame.planes[0].update(audio_bytes) - audio_frame.sample_rate = AUDIO_SAMPLE_RATE - audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE) - return audio_frame - - -def create_stream_buffer(stream_output, video_stream, audio_frame): +def create_stream_buffer(stream_output, video_stream, audio_stream, sequence): """Create a new StreamBuffer.""" - a_packet = None segment = io.BytesIO() + container_options = ( + stream_output.container_options(sequence) + if stream_output.container_options + else {} + ) output = av.open( segment, mode="w", format=stream_output.format, - container_options={ - "video_track_timescale": str(int(1 / video_stream.time_base)), - **(stream_output.container_options or {}), - }, + container_options=container_options, ) vstream = output.add_stream(template=video_stream) # Check if audio is requested astream = None - if stream_output.audio_codec: - astream = output.add_stream(stream_output.audio_codec, AUDIO_SAMPLE_RATE) - # Need to do it multiple times for some reason - while not a_packet: - a_packets = astream.encode(audio_frame) - if a_packets: - a_packet = a_packets[0] - return (a_packet, StreamBuffer(segment, output, vstream, astream)) + if audio_stream and audio_stream.name in stream_output.audio_codecs: + astream = output.add_stream(template=audio_stream) + return StreamBuffer(segment, output, vstream, astream) def stream_worker(hass, stream, quit_event): @@ -60,32 +43,133 @@ def stream_worker(hass, stream, quit_event): except (KeyError, IndexError): _LOGGER.error("Stream has no video") return + try: + audio_stream = container.streams.audio[0] + except (KeyError, IndexError): + audio_stream = None - audio_frame = generate_audio_frame() - - first_packet = True - # Holds the buffers for each stream provider - outputs = {} - # Keep track of the number of segments we've processed - sequence = 1 - # Holds the generated silence that needs to be muxed into the output - audio_packets = {} - # The presentation timestamp of the first video packet we receive - first_pts = 0 - # The decoder timestamp of the latest packet we processed + # The presentation timestamps of the first packet in each stream we receive + # Use to adjust before muxing or outputting, but we don't adjust internally + first_pts = {} + # The decoder timestamps of the latest packet in each stream we processed last_dts = None # Keep track of consecutive packets without a dts to detect end of stream. last_packet_was_without_dts = False - # The pts at the beginning of the segment - segment_start_v_pts = 0 - segment_start_a_pts = 0 + # Holds the buffers for each stream provider + outputs = None + # Keep track of the number of segments we've processed + sequence = 0 + # The video pts at the beginning of the segment + segment_start_pts = None + # Because of problems 1 and 2 below, we need to store the first few packets and replay them + initial_packets = deque() + + # Have to work around two problems with RTSP feeds in ffmpeg + # 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018 + # 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815 + + def peek_first_pts(): + nonlocal first_pts, audio_stream + + def empty_stream_dict(): + return { + video_stream: None, + **({audio_stream: None} if audio_stream else {}), + } + + try: + first_packet = empty_stream_dict() + first_pts = empty_stream_dict() + # Get to first video keyframe + while first_packet[video_stream] is None: + packet = next(container.demux()) + if packet.stream == video_stream and packet.is_keyframe: + first_packet[video_stream] = packet + initial_packets.append(packet) + # Get first_pts from subsequent frame to first keyframe + while any( + [pts is None for pts in {**first_packet, **first_pts}.values()] + ) and (len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO): + packet = next(container.demux((video_stream, audio_stream))) + if ( + first_packet[packet.stream] is None + ): # actually video already found above so only for audio + if packet.is_keyframe: + first_packet[packet.stream] = packet + else: # Discard leading non-keyframes + continue + else: # This is the second frame to calculate first_pts from + if first_pts[packet.stream] is None: + first_pts[packet.stream] = packet.dts - packet.duration + first_packet[packet.stream].pts = first_pts[packet.stream] + first_packet[packet.stream].dts = first_pts[packet.stream] + initial_packets.append(packet) + if audio_stream and first_packet[audio_stream] is None: + _LOGGER.warning( + "Audio stream not found" + ) # Some streams declare an audio stream and never send any packets + del first_pts[audio_stream] + audio_stream = None + + except (av.AVError, StopIteration) as ex: + # End of stream, clear listeners and stop thread + for fmt, _ in outputs.items(): + hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None) + _LOGGER.error( + "Error demuxing stream while finding first packet: %s", str(ex) + ) + quit_event.set() + + def initialize_segment(video_pts): + """Reset some variables and initialize outputs for each segment.""" + nonlocal outputs, sequence, segment_start_pts + # Clear outputs and increment sequence + outputs = {} + sequence += 1 + segment_start_pts = video_pts + for stream_output in stream.outputs.values(): + if video_stream.name not in stream_output.video_codecs: + continue + buffer = create_stream_buffer( + stream_output, video_stream, audio_stream, sequence + ) + outputs[stream_output.name] = ( + buffer, + {video_stream: buffer.vstream, audio_stream: buffer.astream}, + ) + + def mux_video_packet(packet): + # adjust pts and dts before muxing + packet.pts -= first_pts[video_stream] + packet.dts -= first_pts[video_stream] + # mux packets to each buffer + for buffer, output_streams in outputs.values(): + # Assign the packet to the new stream & mux + packet.stream = output_streams[video_stream] + buffer.output.mux(packet) + + def mux_audio_packet(packet): + # almost the same as muxing video but add extra check + # adjust pts and dts before muxing + packet.pts -= first_pts[audio_stream] + packet.dts -= first_pts[audio_stream] + for buffer, output_streams in outputs.values(): + # Assign the packet to the new stream & mux + if output_streams.get(audio_stream): + packet.stream = output_streams[audio_stream] + buffer.output.mux(packet) + + peek_first_pts() + last_dts = {k: v - 1 for k, v in first_pts.items()} + initialize_segment(first_pts[video_stream]) while not quit_event.is_set(): try: - packet = next(container.demux(video_stream)) + if len(initial_packets) > 0: + packet = initial_packets.popleft() + else: + packet = next(container.demux((video_stream, audio_stream))) if packet.dts is None: - if first_packet: - continue _LOGGER.error("Stream packet without dts detected, skipping...") # Allow a single packet without dts before terminating the stream. if last_packet_was_without_dts: @@ -101,101 +185,35 @@ def stream_worker(hass, stream, quit_event): _LOGGER.error("Error demuxing stream: %s", str(ex)) break - # Skip non monotonically increasing dts in feed - if not first_packet and last_dts >= packet.dts: + # Discard packet if dts is not monotonic + if packet.dts <= last_dts[packet.stream]: continue - last_dts = packet.dts - # Reset timestamps from a 0 time base for this stream - packet.dts -= first_pts - packet.pts -= first_pts + # Check for end of segment + if packet.stream == video_stream and packet.is_keyframe: + segment_duration = (packet.pts - segment_start_pts) * packet.time_base + if segment_duration >= MIN_SEGMENT_DURATION: + # Save segment to outputs + for fmt, (buffer, _) in outputs.items(): + buffer.output.close() + if stream.outputs.get(fmt): + hass.loop.call_soon_threadsafe( + stream.outputs[fmt].put, + Segment(sequence, buffer.segment, segment_duration,), + ) - # Reset segment on keyframe after we reach desired segment duration - if ( - packet.is_keyframe - and (packet.pts - segment_start_v_pts) * packet.time_base - >= MIN_SEGMENT_DURATION - ): - # Calculate the segment duration by multiplying the difference of the next and the current - # keyframe presentation timestamps by the time base, which gets us total seconds. - segment_duration = (packet.pts - segment_start_v_pts) * packet.time_base - # Save segment to outputs - for fmt, buffer in outputs.items(): - buffer.output.close() - del audio_packets[buffer.astream] - if stream.outputs.get(fmt): - hass.loop.call_soon_threadsafe( - stream.outputs[fmt].put, - Segment( - sequence, - buffer.segment, - segment_duration, - (segment_start_v_pts, segment_start_a_pts), - ), - ) + # Reinitialize + initialize_segment(packet.pts) - # Clear outputs and increment sequence - outputs = {} - if not first_packet: - sequence += 1 - segment_start_v_pts = packet.pts - segment_start_a_pts = int( - packet.pts * packet.time_base * AUDIO_SAMPLE_RATE - ) - - # Initialize outputs - for stream_output in stream.outputs.values(): - if video_stream.name not in stream_output.video_codecs: - continue - - a_packet, buffer = create_stream_buffer( - stream_output, video_stream, audio_frame - ) - audio_packets[buffer.astream] = a_packet - outputs[stream_output.name] = buffer - - # First video packet tends to have a weird dts/pts - if first_packet: - # If we are attaching to a live stream that does not reset - # timestamps for us, we need to do it ourselves by recording - # the first presentation timestamp and subtracting it from - # subsequent packets we receive. - if (packet.pts * packet.time_base) > 1: - first_pts = packet.pts - packet.dts = 0 - packet.pts = 0 - first_packet = False - - # Store packets on each output - for buffer in outputs.values(): - # Check if the format requires audio - if audio_packets.get(buffer.astream): - a_packet = audio_packets[buffer.astream] - a_time_base = a_packet.time_base - - # Determine video start timestamp and duration - video_start = packet.pts * packet.time_base - video_duration = packet.duration * packet.time_base - - if packet.is_keyframe: - # Set first audio packet in sequence to equal video pts - a_packet.pts = int(video_start / a_time_base) - a_packet.dts = int(video_start / a_time_base) - - # Determine target end timestamp for audio - target_pts = int((video_start + video_duration) / a_time_base) - while a_packet.pts < target_pts: - # Mux audio packet and adjust points until target hit - buffer.output.mux(a_packet) - a_packet.pts += a_packet.duration - a_packet.dts += a_packet.duration - audio_packets[buffer.astream] = a_packet - - # Assign the video packet to the new stream & mux - packet.stream = buffer.vstream - buffer.output.mux(packet) + # Update last_dts processed + last_dts[packet.stream] = packet.dts + # mux video packets immediately, save audio packets to be muxed all at once + if packet.stream == video_stream: + mux_video_packet(packet) # mutates packet timestamps + else: + mux_audio_packet(packet) # mutates packet timestamps # Close stream - for buffer in outputs.values(): + for buffer, _ in outputs.values(): buffer.output.close() container.close() diff --git a/tests/components/stream/common.py b/tests/components/stream/common.py index 354b94f7089..2359a771035 100644 --- a/tests/components/stream/common.py +++ b/tests/components/stream/common.py @@ -1,5 +1,7 @@ """Collection of test helpers.""" +from fractions import Fraction import io +import logging import av import numpy as np @@ -7,27 +9,59 @@ import numpy as np from homeassistant.components.stream import Stream from homeassistant.components.stream.const import ATTR_STREAMS, DOMAIN +_LOGGER = logging.getLogger(__name__) -def generate_h264_video(): +AUDIO_SAMPLE_RATE = 8000 + + +def generate_h264_video(container_format="mp4", audio_codec=None): """ Generate a test video. See: http://docs.mikeboers.com/pyav/develop/cookbook/numpy.html """ + def generate_audio_frame(pcm_mulaw=False): + """Generate a blank audio frame.""" + if pcm_mulaw: + audio_frame = av.AudioFrame(format="s16", layout="mono", samples=1) + audio_bytes = b"\x00\x00" + else: + audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024) + audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024 + audio_frame.planes[0].update(audio_bytes) + audio_frame.sample_rate = AUDIO_SAMPLE_RATE + audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE) + return audio_frame + duration = 5 fps = 24 total_frames = duration * fps output = io.BytesIO() - output.name = "test.mp4" - container = av.open(output, mode="w") + output.name = "test.mov" if container_format == "mov" else "test.mp4" + container = av.open(output, mode="w", format=container_format) stream = container.add_stream("libx264", rate=fps) stream.width = 480 stream.height = 320 stream.pix_fmt = "yuv420p" + a_packet = None + last_a_dts = -1 + if audio_codec is not None: + if audio_codec == "empty": # empty we add a stream but don't mux any audio + astream = container.add_stream("aac", AUDIO_SAMPLE_RATE) + else: + astream = container.add_stream(audio_codec, AUDIO_SAMPLE_RATE) + # Need to do it multiple times for some reason + while not a_packet: + a_packets = astream.encode( + generate_audio_frame(pcm_mulaw=audio_codec == "pcm_mulaw") + ) + if a_packets: + a_packet = a_packets[0] + for frame_i in range(total_frames): img = np.empty((480, 320, 3)) @@ -42,6 +76,17 @@ def generate_h264_video(): for packet in stream.encode(frame): container.mux(packet) + if a_packet is not None: + a_packet.pts = int(frame_i / (fps * a_packet.time_base)) + while a_packet.pts * a_packet.time_base * fps < frame_i + 1: + a_packet.dts = a_packet.pts + if ( + a_packet.dts > last_a_dts + ): # avoid writing same dts twice in case of rounding + container.mux(a_packet) + last_a_dts = a_packet.dts + a_packet.pts += a_packet.duration + # Flush stream for packet in stream.encode(): container.mux(packet) diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index ff18cb66590..cb6a1c9d36f 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -2,6 +2,7 @@ from datetime import timedelta from io import BytesIO +import av import pytest from homeassistant.components.stream.core import Segment @@ -75,7 +76,45 @@ async def test_recorder_save(): output.name = "test.mp4" # Run - recorder_save_worker(output, [Segment(1, source, 4, (360000, 176400))]) + recorder_save_worker(output, [Segment(1, source, 4)], "mp4") # Assert assert output.getvalue() + + +@pytest.mark.skip("Flaky in CI") +async def test_record_stream_audio(hass, hass_client): + """ + Test treatment of different audio inputs. + + Record stream output should have an audio channel when input has + a valid codec and audio packets and no audio channel otherwise. + """ + await async_setup_component(hass, "stream", {"stream": {}}) + + for a_codec, expected_audio_streams in ( + ("aac", 1), # aac is a valid mp4 codec + ("pcm_mulaw", 0), # G.711 is not a valid mp4 codec + ("empty", 0), # audio stream with no packets + (None, 0), # no audio stream + ): + with patch("homeassistant.components.stream.recorder.recorder_save_worker"): + # Setup demo track + source = generate_h264_video( + container_format="mov", audio_codec=a_codec + ) # mov can store PCM + stream = preload_stream(hass, source) + recorder = stream.add_provider("recorder") + stream.start() + + while True: + segment = await recorder.recv() + if not segment: + break + last_segment = segment + + result = av.open(last_segment.segment, "r", format="mp4") + + assert len(result.streams.audio) == expected_audio_streams + result.close() + stream.stop()