mirror of
https://github.com/home-assistant/core.git
synced 2025-07-18 18:57:06 +00:00
Add audio to stream (#38846)
* Add audio to stream component * Use container options to do most fmp4 formatting * Add test for treatment of different audio inputs * Add test for treatment of different audio inputs * pcm_mulaw frames should be s16 * Use seek to get BytesIO length * Remove unused utcnow * Remove peek_next_audio_pts * only demux audio and video packets - ignoring data and subtitle streams Co-authored-by: Jason Hunter <hunterjm@gmail.com>
This commit is contained in:
parent
b0974d89a6
commit
dc2d0b9297
@ -15,7 +15,7 @@ OUTPUT_FORMATS = ["hls"]
|
|||||||
|
|
||||||
FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"}
|
FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"}
|
||||||
|
|
||||||
AUDIO_SAMPLE_RATE = 44100
|
|
||||||
|
|
||||||
MAX_SEGMENTS = 3 # Max number of segments to keep around
|
MAX_SEGMENTS = 3 # Max number of segments to keep around
|
||||||
MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds
|
MIN_SEGMENT_DURATION = 1.5 # Each segment is at least this many seconds
|
||||||
|
|
||||||
|
PACKETS_TO_WAIT_FOR_AUDIO = 20 # Some streams have an audio stream with no audio
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from collections import deque
|
from collections import deque
|
||||||
import io
|
import io
|
||||||
from typing import Any, List
|
from typing import Any, Callable, List
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
import attr
|
import attr
|
||||||
@ -34,7 +34,6 @@ class Segment:
|
|||||||
sequence: int = attr.ib()
|
sequence: int = attr.ib()
|
||||||
segment: io.BytesIO = attr.ib()
|
segment: io.BytesIO = attr.ib()
|
||||||
duration: float = attr.ib()
|
duration: float = attr.ib()
|
||||||
start_pts: tuple = attr.ib()
|
|
||||||
|
|
||||||
|
|
||||||
class StreamOutput:
|
class StreamOutput:
|
||||||
@ -61,8 +60,8 @@ class StreamOutput:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def audio_codec(self) -> str:
|
def audio_codecs(self) -> str:
|
||||||
"""Return desired audio codec."""
|
"""Return desired audio codecs."""
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -71,8 +70,8 @@ class StreamOutput:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def container_options(self) -> dict:
|
def container_options(self) -> Callable[[int], dict]:
|
||||||
"""Return container options."""
|
"""Return Callable which takes a sequence number and returns container options."""
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -5,7 +5,8 @@ import io
|
|||||||
def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int:
|
def find_box(segment: io.BytesIO, target_type: bytes, box_start: int = 0) -> int:
|
||||||
"""Find location of first box (or sub_box if box_start provided) of given type."""
|
"""Find location of first box (or sub_box if box_start provided) of given type."""
|
||||||
if box_start == 0:
|
if box_start == 0:
|
||||||
box_end = len(segment.getbuffer())
|
box_end = segment.seek(0, io.SEEK_END)
|
||||||
|
segment.seek(0)
|
||||||
index = 0
|
index = 0
|
||||||
else:
|
else:
|
||||||
segment.seek(box_start)
|
segment.seek(box_start)
|
||||||
@ -29,22 +30,9 @@ def get_init(segment: io.BytesIO) -> bytes:
|
|||||||
return segment.read(moof_location)
|
return segment.read(moof_location)
|
||||||
|
|
||||||
|
|
||||||
def get_m4s(segment: io.BytesIO, start_pts: tuple, sequence: int) -> bytes:
|
def get_m4s(segment: io.BytesIO, sequence: int) -> bytes:
|
||||||
"""Get m4s section from fragmented mp4."""
|
"""Get m4s section from fragmented mp4."""
|
||||||
moof_location = next(find_box(segment, b"moof"))
|
moof_location = next(find_box(segment, b"moof"))
|
||||||
mfra_location = next(find_box(segment, b"mfra"))
|
mfra_location = next(find_box(segment, b"mfra"))
|
||||||
# adjust mfhd sequence number in moof
|
|
||||||
view = segment.getbuffer()
|
|
||||||
view[moof_location + 20 : moof_location + 24] = sequence.to_bytes(4, "big")
|
|
||||||
# adjust tfdt in video traf
|
|
||||||
traf_finder = find_box(segment, b"traf", moof_location)
|
|
||||||
traf_location = next(traf_finder)
|
|
||||||
tfdt_location = next(find_box(segment, b"tfdt", traf_location))
|
|
||||||
view[tfdt_location + 12 : tfdt_location + 20] = start_pts[0].to_bytes(8, "big")
|
|
||||||
# adjust tfdt in audio traf
|
|
||||||
traf_location = next(traf_finder)
|
|
||||||
tfdt_location = next(find_box(segment, b"tfdt", traf_location))
|
|
||||||
view[tfdt_location + 12 : tfdt_location + 20] = start_pts[1].to_bytes(8, "big")
|
|
||||||
# done adjusting
|
|
||||||
segment.seek(moof_location)
|
segment.seek(moof_location)
|
||||||
return segment.read(mfra_location - moof_location)
|
return segment.read(mfra_location - moof_location)
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
"""Provide functionality to stream HLS."""
|
"""Provide functionality to stream HLS."""
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
|
|
||||||
from homeassistant.core import callback
|
from homeassistant.core import callback
|
||||||
from homeassistant.util.dt import utcnow
|
|
||||||
|
|
||||||
from .const import FORMAT_CONTENT_TYPE
|
from .const import FORMAT_CONTENT_TYPE
|
||||||
from .core import PROVIDERS, StreamOutput, StreamView
|
from .core import PROVIDERS, StreamOutput, StreamView
|
||||||
@ -35,7 +36,7 @@ class HlsPlaylistView(StreamView):
|
|||||||
await track.recv()
|
await track.recv()
|
||||||
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
|
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
|
||||||
return web.Response(
|
return web.Response(
|
||||||
body=renderer.render(track, utcnow()).encode("utf-8"), headers=headers
|
body=renderer.render(track).encode("utf-8"), headers=headers
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -71,8 +72,7 @@ class HlsSegmentView(StreamView):
|
|||||||
return web.HTTPNotFound()
|
return web.HTTPNotFound()
|
||||||
headers = {"Content-Type": "video/iso.segment"}
|
headers = {"Content-Type": "video/iso.segment"}
|
||||||
return web.Response(
|
return web.Response(
|
||||||
body=get_m4s(segment.segment, segment.start_pts, int(sequence)),
|
body=get_m4s(segment.segment, int(sequence)), headers=headers,
|
||||||
headers=headers,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -90,11 +90,10 @@ class M3U8Renderer:
|
|||||||
"#EXT-X-VERSION:7",
|
"#EXT-X-VERSION:7",
|
||||||
f"#EXT-X-TARGETDURATION:{track.target_duration}",
|
f"#EXT-X-TARGETDURATION:{track.target_duration}",
|
||||||
'#EXT-X-MAP:URI="init.mp4"',
|
'#EXT-X-MAP:URI="init.mp4"',
|
||||||
"#EXT-X-INDEPENDENT-SEGMENTS",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def render_playlist(track, start_time):
|
def render_playlist(track):
|
||||||
"""Render playlist."""
|
"""Render playlist."""
|
||||||
segments = track.segments
|
segments = track.segments
|
||||||
|
|
||||||
@ -114,13 +113,9 @@ class M3U8Renderer:
|
|||||||
|
|
||||||
return playlist
|
return playlist
|
||||||
|
|
||||||
def render(self, track, start_time):
|
def render(self, track):
|
||||||
"""Render M3U8 file."""
|
"""Render M3U8 file."""
|
||||||
lines = (
|
lines = ["#EXTM3U"] + self.render_preamble(track) + self.render_playlist(track)
|
||||||
["#EXTM3U"]
|
|
||||||
+ self.render_preamble(track)
|
|
||||||
+ self.render_playlist(track, start_time)
|
|
||||||
)
|
|
||||||
return "\n".join(lines) + "\n"
|
return "\n".join(lines) + "\n"
|
||||||
|
|
||||||
|
|
||||||
@ -139,9 +134,9 @@ class HlsStreamOutput(StreamOutput):
|
|||||||
return "mp4"
|
return "mp4"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def audio_codec(self) -> str:
|
def audio_codecs(self) -> str:
|
||||||
"""Return desired audio codec."""
|
"""Return desired audio codecs."""
|
||||||
return "aac"
|
return {"aac", "ac3", "mp3"}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def video_codecs(self) -> tuple:
|
def video_codecs(self) -> tuple:
|
||||||
@ -149,6 +144,10 @@ class HlsStreamOutput(StreamOutput):
|
|||||||
return {"hevc", "h264"}
|
return {"hevc", "h264"}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def container_options(self) -> dict:
|
def container_options(self) -> Callable[[int], dict]:
|
||||||
"""Return container options."""
|
"""Return Callable which takes a sequence number and returns container options."""
|
||||||
return {"movflags": "frag_custom+empty_moov+default_base_moof"}
|
return lambda sequence: {
|
||||||
|
"movflags": "frag_custom+empty_moov+default_base_moof+skip_sidx+frag_discont",
|
||||||
|
"avoid_negative_ts": "make_non_negative",
|
||||||
|
"fragment_index": str(sequence),
|
||||||
|
}
|
||||||
|
@ -15,16 +15,17 @@ def async_setup_recorder(hass):
|
|||||||
"""Only here so Provider Registry works."""
|
"""Only here so Provider Registry works."""
|
||||||
|
|
||||||
|
|
||||||
def recorder_save_worker(file_out: str, segments: List[Segment]):
|
def recorder_save_worker(file_out: str, segments: List[Segment], container_format: str):
|
||||||
"""Handle saving stream."""
|
"""Handle saving stream."""
|
||||||
first_pts = segments[0].start_pts[0]
|
first_pts = {"video": None, "audio": None}
|
||||||
output = av.open(file_out, "w")
|
output = av.open(file_out, "w", format=container_format)
|
||||||
output_v = None
|
output_v = None
|
||||||
|
output_a = None
|
||||||
|
|
||||||
for segment in segments:
|
for segment in segments:
|
||||||
# Seek to beginning and open segment
|
# Seek to beginning and open segment
|
||||||
segment.segment.seek(0)
|
segment.segment.seek(0)
|
||||||
source = av.open(segment.segment, "r", format="mp4")
|
source = av.open(segment.segment, "r", format=container_format)
|
||||||
source_v = source.streams.video[0]
|
source_v = source.streams.video[0]
|
||||||
|
|
||||||
# Add output streams
|
# Add output streams
|
||||||
@ -32,16 +33,18 @@ def recorder_save_worker(file_out: str, segments: List[Segment]):
|
|||||||
output_v = output.add_stream(template=source_v)
|
output_v = output.add_stream(template=source_v)
|
||||||
context = output_v.codec_context
|
context = output_v.codec_context
|
||||||
context.flags |= "GLOBAL_HEADER"
|
context.flags |= "GLOBAL_HEADER"
|
||||||
|
if not output_a and len(source.streams.audio) > 0:
|
||||||
|
source_a = source.streams.audio[0]
|
||||||
|
output_a = output.add_stream(template=source_a)
|
||||||
|
|
||||||
# Remux video
|
# Remux video
|
||||||
for packet in source.demux(source_v):
|
for packet in source.demux():
|
||||||
if packet is not None and packet.dts is not None:
|
if packet is not None and packet.dts is not None:
|
||||||
if packet.pts < segment.start_pts[0]:
|
if first_pts[packet.stream.type] is None:
|
||||||
packet.pts += segment.start_pts[0]
|
first_pts[packet.stream.type] = packet.pts
|
||||||
packet.dts += segment.start_pts[0]
|
packet.pts -= first_pts[packet.stream.type]
|
||||||
packet.pts -= first_pts
|
packet.dts -= first_pts[packet.stream.type]
|
||||||
packet.dts -= first_pts
|
packet.stream = output_v if packet.stream.type == "video" else output_a
|
||||||
packet.stream = output_v
|
|
||||||
output.mux(packet)
|
output.mux(packet)
|
||||||
|
|
||||||
source.close()
|
source.close()
|
||||||
@ -70,9 +73,9 @@ class RecorderOutput(StreamOutput):
|
|||||||
return "mp4"
|
return "mp4"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def audio_codec(self) -> str:
|
def audio_codecs(self) -> str:
|
||||||
"""Return desired audio codec."""
|
"""Return desired audio codec."""
|
||||||
return "aac"
|
return {"aac", "ac3", "mp3"}
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def video_codecs(self) -> tuple:
|
def video_codecs(self) -> tuple:
|
||||||
@ -96,7 +99,7 @@ class RecorderOutput(StreamOutput):
|
|||||||
thread = threading.Thread(
|
thread = threading.Thread(
|
||||||
name="recorder_save_worker",
|
name="recorder_save_worker",
|
||||||
target=recorder_save_worker,
|
target=recorder_save_worker,
|
||||||
args=(self.video_path, self._segments),
|
args=(self.video_path, self._segments, self.format),
|
||||||
)
|
)
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
|
@ -1,54 +1,37 @@
|
|||||||
"""Provides the worker thread needed for processing streams."""
|
"""Provides the worker thread needed for processing streams."""
|
||||||
from fractions import Fraction
|
from collections import deque
|
||||||
import io
|
import io
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import av
|
import av
|
||||||
|
|
||||||
from .const import AUDIO_SAMPLE_RATE, MIN_SEGMENT_DURATION
|
from .const import MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO
|
||||||
from .core import Segment, StreamBuffer
|
from .core import Segment, StreamBuffer
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def generate_audio_frame():
|
def create_stream_buffer(stream_output, video_stream, audio_stream, sequence):
|
||||||
"""Generate a blank audio frame."""
|
|
||||||
|
|
||||||
audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024)
|
|
||||||
# audio_bytes = b''.join(b'\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
# for i in range(0, 1024))
|
|
||||||
audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024
|
|
||||||
audio_frame.planes[0].update(audio_bytes)
|
|
||||||
audio_frame.sample_rate = AUDIO_SAMPLE_RATE
|
|
||||||
audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE)
|
|
||||||
return audio_frame
|
|
||||||
|
|
||||||
|
|
||||||
def create_stream_buffer(stream_output, video_stream, audio_frame):
|
|
||||||
"""Create a new StreamBuffer."""
|
"""Create a new StreamBuffer."""
|
||||||
|
|
||||||
a_packet = None
|
|
||||||
segment = io.BytesIO()
|
segment = io.BytesIO()
|
||||||
|
container_options = (
|
||||||
|
stream_output.container_options(sequence)
|
||||||
|
if stream_output.container_options
|
||||||
|
else {}
|
||||||
|
)
|
||||||
output = av.open(
|
output = av.open(
|
||||||
segment,
|
segment,
|
||||||
mode="w",
|
mode="w",
|
||||||
format=stream_output.format,
|
format=stream_output.format,
|
||||||
container_options={
|
container_options=container_options,
|
||||||
"video_track_timescale": str(int(1 / video_stream.time_base)),
|
|
||||||
**(stream_output.container_options or {}),
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
vstream = output.add_stream(template=video_stream)
|
vstream = output.add_stream(template=video_stream)
|
||||||
# Check if audio is requested
|
# Check if audio is requested
|
||||||
astream = None
|
astream = None
|
||||||
if stream_output.audio_codec:
|
if audio_stream and audio_stream.name in stream_output.audio_codecs:
|
||||||
astream = output.add_stream(stream_output.audio_codec, AUDIO_SAMPLE_RATE)
|
astream = output.add_stream(template=audio_stream)
|
||||||
# Need to do it multiple times for some reason
|
return StreamBuffer(segment, output, vstream, astream)
|
||||||
while not a_packet:
|
|
||||||
a_packets = astream.encode(audio_frame)
|
|
||||||
if a_packets:
|
|
||||||
a_packet = a_packets[0]
|
|
||||||
return (a_packet, StreamBuffer(segment, output, vstream, astream))
|
|
||||||
|
|
||||||
|
|
||||||
def stream_worker(hass, stream, quit_event):
|
def stream_worker(hass, stream, quit_event):
|
||||||
@ -60,32 +43,133 @@ def stream_worker(hass, stream, quit_event):
|
|||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
_LOGGER.error("Stream has no video")
|
_LOGGER.error("Stream has no video")
|
||||||
return
|
return
|
||||||
|
try:
|
||||||
|
audio_stream = container.streams.audio[0]
|
||||||
|
except (KeyError, IndexError):
|
||||||
|
audio_stream = None
|
||||||
|
|
||||||
audio_frame = generate_audio_frame()
|
# The presentation timestamps of the first packet in each stream we receive
|
||||||
|
# Use to adjust before muxing or outputting, but we don't adjust internally
|
||||||
first_packet = True
|
first_pts = {}
|
||||||
# Holds the buffers for each stream provider
|
# The decoder timestamps of the latest packet in each stream we processed
|
||||||
outputs = {}
|
|
||||||
# Keep track of the number of segments we've processed
|
|
||||||
sequence = 1
|
|
||||||
# Holds the generated silence that needs to be muxed into the output
|
|
||||||
audio_packets = {}
|
|
||||||
# The presentation timestamp of the first video packet we receive
|
|
||||||
first_pts = 0
|
|
||||||
# The decoder timestamp of the latest packet we processed
|
|
||||||
last_dts = None
|
last_dts = None
|
||||||
# Keep track of consecutive packets without a dts to detect end of stream.
|
# Keep track of consecutive packets without a dts to detect end of stream.
|
||||||
last_packet_was_without_dts = False
|
last_packet_was_without_dts = False
|
||||||
# The pts at the beginning of the segment
|
# Holds the buffers for each stream provider
|
||||||
segment_start_v_pts = 0
|
outputs = None
|
||||||
segment_start_a_pts = 0
|
# Keep track of the number of segments we've processed
|
||||||
|
sequence = 0
|
||||||
|
# The video pts at the beginning of the segment
|
||||||
|
segment_start_pts = None
|
||||||
|
# Because of problems 1 and 2 below, we need to store the first few packets and replay them
|
||||||
|
initial_packets = deque()
|
||||||
|
|
||||||
|
# Have to work around two problems with RTSP feeds in ffmpeg
|
||||||
|
# 1 - first frame has bad pts/dts https://trac.ffmpeg.org/ticket/5018
|
||||||
|
# 2 - seeking can be problematic https://trac.ffmpeg.org/ticket/7815
|
||||||
|
|
||||||
|
def peek_first_pts():
|
||||||
|
nonlocal first_pts, audio_stream
|
||||||
|
|
||||||
|
def empty_stream_dict():
|
||||||
|
return {
|
||||||
|
video_stream: None,
|
||||||
|
**({audio_stream: None} if audio_stream else {}),
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
first_packet = empty_stream_dict()
|
||||||
|
first_pts = empty_stream_dict()
|
||||||
|
# Get to first video keyframe
|
||||||
|
while first_packet[video_stream] is None:
|
||||||
|
packet = next(container.demux())
|
||||||
|
if packet.stream == video_stream and packet.is_keyframe:
|
||||||
|
first_packet[video_stream] = packet
|
||||||
|
initial_packets.append(packet)
|
||||||
|
# Get first_pts from subsequent frame to first keyframe
|
||||||
|
while any(
|
||||||
|
[pts is None for pts in {**first_packet, **first_pts}.values()]
|
||||||
|
) and (len(initial_packets) < PACKETS_TO_WAIT_FOR_AUDIO):
|
||||||
|
packet = next(container.demux((video_stream, audio_stream)))
|
||||||
|
if (
|
||||||
|
first_packet[packet.stream] is None
|
||||||
|
): # actually video already found above so only for audio
|
||||||
|
if packet.is_keyframe:
|
||||||
|
first_packet[packet.stream] = packet
|
||||||
|
else: # Discard leading non-keyframes
|
||||||
|
continue
|
||||||
|
else: # This is the second frame to calculate first_pts from
|
||||||
|
if first_pts[packet.stream] is None:
|
||||||
|
first_pts[packet.stream] = packet.dts - packet.duration
|
||||||
|
first_packet[packet.stream].pts = first_pts[packet.stream]
|
||||||
|
first_packet[packet.stream].dts = first_pts[packet.stream]
|
||||||
|
initial_packets.append(packet)
|
||||||
|
if audio_stream and first_packet[audio_stream] is None:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Audio stream not found"
|
||||||
|
) # Some streams declare an audio stream and never send any packets
|
||||||
|
del first_pts[audio_stream]
|
||||||
|
audio_stream = None
|
||||||
|
|
||||||
|
except (av.AVError, StopIteration) as ex:
|
||||||
|
# End of stream, clear listeners and stop thread
|
||||||
|
for fmt, _ in outputs.items():
|
||||||
|
hass.loop.call_soon_threadsafe(stream.outputs[fmt].put, None)
|
||||||
|
_LOGGER.error(
|
||||||
|
"Error demuxing stream while finding first packet: %s", str(ex)
|
||||||
|
)
|
||||||
|
quit_event.set()
|
||||||
|
|
||||||
|
def initialize_segment(video_pts):
|
||||||
|
"""Reset some variables and initialize outputs for each segment."""
|
||||||
|
nonlocal outputs, sequence, segment_start_pts
|
||||||
|
# Clear outputs and increment sequence
|
||||||
|
outputs = {}
|
||||||
|
sequence += 1
|
||||||
|
segment_start_pts = video_pts
|
||||||
|
for stream_output in stream.outputs.values():
|
||||||
|
if video_stream.name not in stream_output.video_codecs:
|
||||||
|
continue
|
||||||
|
buffer = create_stream_buffer(
|
||||||
|
stream_output, video_stream, audio_stream, sequence
|
||||||
|
)
|
||||||
|
outputs[stream_output.name] = (
|
||||||
|
buffer,
|
||||||
|
{video_stream: buffer.vstream, audio_stream: buffer.astream},
|
||||||
|
)
|
||||||
|
|
||||||
|
def mux_video_packet(packet):
|
||||||
|
# adjust pts and dts before muxing
|
||||||
|
packet.pts -= first_pts[video_stream]
|
||||||
|
packet.dts -= first_pts[video_stream]
|
||||||
|
# mux packets to each buffer
|
||||||
|
for buffer, output_streams in outputs.values():
|
||||||
|
# Assign the packet to the new stream & mux
|
||||||
|
packet.stream = output_streams[video_stream]
|
||||||
|
buffer.output.mux(packet)
|
||||||
|
|
||||||
|
def mux_audio_packet(packet):
|
||||||
|
# almost the same as muxing video but add extra check
|
||||||
|
# adjust pts and dts before muxing
|
||||||
|
packet.pts -= first_pts[audio_stream]
|
||||||
|
packet.dts -= first_pts[audio_stream]
|
||||||
|
for buffer, output_streams in outputs.values():
|
||||||
|
# Assign the packet to the new stream & mux
|
||||||
|
if output_streams.get(audio_stream):
|
||||||
|
packet.stream = output_streams[audio_stream]
|
||||||
|
buffer.output.mux(packet)
|
||||||
|
|
||||||
|
peek_first_pts()
|
||||||
|
last_dts = {k: v - 1 for k, v in first_pts.items()}
|
||||||
|
initialize_segment(first_pts[video_stream])
|
||||||
|
|
||||||
while not quit_event.is_set():
|
while not quit_event.is_set():
|
||||||
try:
|
try:
|
||||||
packet = next(container.demux(video_stream))
|
if len(initial_packets) > 0:
|
||||||
|
packet = initial_packets.popleft()
|
||||||
|
else:
|
||||||
|
packet = next(container.demux((video_stream, audio_stream)))
|
||||||
if packet.dts is None:
|
if packet.dts is None:
|
||||||
if first_packet:
|
|
||||||
continue
|
|
||||||
_LOGGER.error("Stream packet without dts detected, skipping...")
|
_LOGGER.error("Stream packet without dts detected, skipping...")
|
||||||
# Allow a single packet without dts before terminating the stream.
|
# Allow a single packet without dts before terminating the stream.
|
||||||
if last_packet_was_without_dts:
|
if last_packet_was_without_dts:
|
||||||
@ -101,101 +185,35 @@ def stream_worker(hass, stream, quit_event):
|
|||||||
_LOGGER.error("Error demuxing stream: %s", str(ex))
|
_LOGGER.error("Error demuxing stream: %s", str(ex))
|
||||||
break
|
break
|
||||||
|
|
||||||
# Skip non monotonically increasing dts in feed
|
# Discard packet if dts is not monotonic
|
||||||
if not first_packet and last_dts >= packet.dts:
|
if packet.dts <= last_dts[packet.stream]:
|
||||||
continue
|
continue
|
||||||
last_dts = packet.dts
|
|
||||||
|
|
||||||
# Reset timestamps from a 0 time base for this stream
|
# Check for end of segment
|
||||||
packet.dts -= first_pts
|
if packet.stream == video_stream and packet.is_keyframe:
|
||||||
packet.pts -= first_pts
|
segment_duration = (packet.pts - segment_start_pts) * packet.time_base
|
||||||
|
if segment_duration >= MIN_SEGMENT_DURATION:
|
||||||
# Reset segment on keyframe after we reach desired segment duration
|
|
||||||
if (
|
|
||||||
packet.is_keyframe
|
|
||||||
and (packet.pts - segment_start_v_pts) * packet.time_base
|
|
||||||
>= MIN_SEGMENT_DURATION
|
|
||||||
):
|
|
||||||
# Calculate the segment duration by multiplying the difference of the next and the current
|
|
||||||
# keyframe presentation timestamps by the time base, which gets us total seconds.
|
|
||||||
segment_duration = (packet.pts - segment_start_v_pts) * packet.time_base
|
|
||||||
# Save segment to outputs
|
# Save segment to outputs
|
||||||
for fmt, buffer in outputs.items():
|
for fmt, (buffer, _) in outputs.items():
|
||||||
buffer.output.close()
|
buffer.output.close()
|
||||||
del audio_packets[buffer.astream]
|
|
||||||
if stream.outputs.get(fmt):
|
if stream.outputs.get(fmt):
|
||||||
hass.loop.call_soon_threadsafe(
|
hass.loop.call_soon_threadsafe(
|
||||||
stream.outputs[fmt].put,
|
stream.outputs[fmt].put,
|
||||||
Segment(
|
Segment(sequence, buffer.segment, segment_duration,),
|
||||||
sequence,
|
|
||||||
buffer.segment,
|
|
||||||
segment_duration,
|
|
||||||
(segment_start_v_pts, segment_start_a_pts),
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Clear outputs and increment sequence
|
# Reinitialize
|
||||||
outputs = {}
|
initialize_segment(packet.pts)
|
||||||
if not first_packet:
|
|
||||||
sequence += 1
|
|
||||||
segment_start_v_pts = packet.pts
|
|
||||||
segment_start_a_pts = int(
|
|
||||||
packet.pts * packet.time_base * AUDIO_SAMPLE_RATE
|
|
||||||
)
|
|
||||||
|
|
||||||
# Initialize outputs
|
# Update last_dts processed
|
||||||
for stream_output in stream.outputs.values():
|
last_dts[packet.stream] = packet.dts
|
||||||
if video_stream.name not in stream_output.video_codecs:
|
# mux video packets immediately, save audio packets to be muxed all at once
|
||||||
continue
|
if packet.stream == video_stream:
|
||||||
|
mux_video_packet(packet) # mutates packet timestamps
|
||||||
a_packet, buffer = create_stream_buffer(
|
else:
|
||||||
stream_output, video_stream, audio_frame
|
mux_audio_packet(packet) # mutates packet timestamps
|
||||||
)
|
|
||||||
audio_packets[buffer.astream] = a_packet
|
|
||||||
outputs[stream_output.name] = buffer
|
|
||||||
|
|
||||||
# First video packet tends to have a weird dts/pts
|
|
||||||
if first_packet:
|
|
||||||
# If we are attaching to a live stream that does not reset
|
|
||||||
# timestamps for us, we need to do it ourselves by recording
|
|
||||||
# the first presentation timestamp and subtracting it from
|
|
||||||
# subsequent packets we receive.
|
|
||||||
if (packet.pts * packet.time_base) > 1:
|
|
||||||
first_pts = packet.pts
|
|
||||||
packet.dts = 0
|
|
||||||
packet.pts = 0
|
|
||||||
first_packet = False
|
|
||||||
|
|
||||||
# Store packets on each output
|
|
||||||
for buffer in outputs.values():
|
|
||||||
# Check if the format requires audio
|
|
||||||
if audio_packets.get(buffer.astream):
|
|
||||||
a_packet = audio_packets[buffer.astream]
|
|
||||||
a_time_base = a_packet.time_base
|
|
||||||
|
|
||||||
# Determine video start timestamp and duration
|
|
||||||
video_start = packet.pts * packet.time_base
|
|
||||||
video_duration = packet.duration * packet.time_base
|
|
||||||
|
|
||||||
if packet.is_keyframe:
|
|
||||||
# Set first audio packet in sequence to equal video pts
|
|
||||||
a_packet.pts = int(video_start / a_time_base)
|
|
||||||
a_packet.dts = int(video_start / a_time_base)
|
|
||||||
|
|
||||||
# Determine target end timestamp for audio
|
|
||||||
target_pts = int((video_start + video_duration) / a_time_base)
|
|
||||||
while a_packet.pts < target_pts:
|
|
||||||
# Mux audio packet and adjust points until target hit
|
|
||||||
buffer.output.mux(a_packet)
|
|
||||||
a_packet.pts += a_packet.duration
|
|
||||||
a_packet.dts += a_packet.duration
|
|
||||||
audio_packets[buffer.astream] = a_packet
|
|
||||||
|
|
||||||
# Assign the video packet to the new stream & mux
|
|
||||||
packet.stream = buffer.vstream
|
|
||||||
buffer.output.mux(packet)
|
|
||||||
|
|
||||||
# Close stream
|
# Close stream
|
||||||
for buffer in outputs.values():
|
for buffer, _ in outputs.values():
|
||||||
buffer.output.close()
|
buffer.output.close()
|
||||||
container.close()
|
container.close()
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
"""Collection of test helpers."""
|
"""Collection of test helpers."""
|
||||||
|
from fractions import Fraction
|
||||||
import io
|
import io
|
||||||
|
import logging
|
||||||
|
|
||||||
import av
|
import av
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -7,27 +9,59 @@ import numpy as np
|
|||||||
from homeassistant.components.stream import Stream
|
from homeassistant.components.stream import Stream
|
||||||
from homeassistant.components.stream.const import ATTR_STREAMS, DOMAIN
|
from homeassistant.components.stream.const import ATTR_STREAMS, DOMAIN
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
def generate_h264_video():
|
AUDIO_SAMPLE_RATE = 8000
|
||||||
|
|
||||||
|
|
||||||
|
def generate_h264_video(container_format="mp4", audio_codec=None):
|
||||||
"""
|
"""
|
||||||
Generate a test video.
|
Generate a test video.
|
||||||
|
|
||||||
See: http://docs.mikeboers.com/pyav/develop/cookbook/numpy.html
|
See: http://docs.mikeboers.com/pyav/develop/cookbook/numpy.html
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def generate_audio_frame(pcm_mulaw=False):
|
||||||
|
"""Generate a blank audio frame."""
|
||||||
|
if pcm_mulaw:
|
||||||
|
audio_frame = av.AudioFrame(format="s16", layout="mono", samples=1)
|
||||||
|
audio_bytes = b"\x00\x00"
|
||||||
|
else:
|
||||||
|
audio_frame = av.AudioFrame(format="dbl", layout="mono", samples=1024)
|
||||||
|
audio_bytes = b"\x00\x00\x00\x00\x00\x00\x00\x00" * 1024
|
||||||
|
audio_frame.planes[0].update(audio_bytes)
|
||||||
|
audio_frame.sample_rate = AUDIO_SAMPLE_RATE
|
||||||
|
audio_frame.time_base = Fraction(1, AUDIO_SAMPLE_RATE)
|
||||||
|
return audio_frame
|
||||||
|
|
||||||
duration = 5
|
duration = 5
|
||||||
fps = 24
|
fps = 24
|
||||||
total_frames = duration * fps
|
total_frames = duration * fps
|
||||||
|
|
||||||
output = io.BytesIO()
|
output = io.BytesIO()
|
||||||
output.name = "test.mp4"
|
output.name = "test.mov" if container_format == "mov" else "test.mp4"
|
||||||
container = av.open(output, mode="w")
|
container = av.open(output, mode="w", format=container_format)
|
||||||
|
|
||||||
stream = container.add_stream("libx264", rate=fps)
|
stream = container.add_stream("libx264", rate=fps)
|
||||||
stream.width = 480
|
stream.width = 480
|
||||||
stream.height = 320
|
stream.height = 320
|
||||||
stream.pix_fmt = "yuv420p"
|
stream.pix_fmt = "yuv420p"
|
||||||
|
|
||||||
|
a_packet = None
|
||||||
|
last_a_dts = -1
|
||||||
|
if audio_codec is not None:
|
||||||
|
if audio_codec == "empty": # empty we add a stream but don't mux any audio
|
||||||
|
astream = container.add_stream("aac", AUDIO_SAMPLE_RATE)
|
||||||
|
else:
|
||||||
|
astream = container.add_stream(audio_codec, AUDIO_SAMPLE_RATE)
|
||||||
|
# Need to do it multiple times for some reason
|
||||||
|
while not a_packet:
|
||||||
|
a_packets = astream.encode(
|
||||||
|
generate_audio_frame(pcm_mulaw=audio_codec == "pcm_mulaw")
|
||||||
|
)
|
||||||
|
if a_packets:
|
||||||
|
a_packet = a_packets[0]
|
||||||
|
|
||||||
for frame_i in range(total_frames):
|
for frame_i in range(total_frames):
|
||||||
|
|
||||||
img = np.empty((480, 320, 3))
|
img = np.empty((480, 320, 3))
|
||||||
@ -42,6 +76,17 @@ def generate_h264_video():
|
|||||||
for packet in stream.encode(frame):
|
for packet in stream.encode(frame):
|
||||||
container.mux(packet)
|
container.mux(packet)
|
||||||
|
|
||||||
|
if a_packet is not None:
|
||||||
|
a_packet.pts = int(frame_i / (fps * a_packet.time_base))
|
||||||
|
while a_packet.pts * a_packet.time_base * fps < frame_i + 1:
|
||||||
|
a_packet.dts = a_packet.pts
|
||||||
|
if (
|
||||||
|
a_packet.dts > last_a_dts
|
||||||
|
): # avoid writing same dts twice in case of rounding
|
||||||
|
container.mux(a_packet)
|
||||||
|
last_a_dts = a_packet.dts
|
||||||
|
a_packet.pts += a_packet.duration
|
||||||
|
|
||||||
# Flush stream
|
# Flush stream
|
||||||
for packet in stream.encode():
|
for packet in stream.encode():
|
||||||
container.mux(packet)
|
container.mux(packet)
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
|
import av
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components.stream.core import Segment
|
from homeassistant.components.stream.core import Segment
|
||||||
@ -75,7 +76,45 @@ async def test_recorder_save():
|
|||||||
output.name = "test.mp4"
|
output.name = "test.mp4"
|
||||||
|
|
||||||
# Run
|
# Run
|
||||||
recorder_save_worker(output, [Segment(1, source, 4, (360000, 176400))])
|
recorder_save_worker(output, [Segment(1, source, 4)], "mp4")
|
||||||
|
|
||||||
# Assert
|
# Assert
|
||||||
assert output.getvalue()
|
assert output.getvalue()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skip("Flaky in CI")
|
||||||
|
async def test_record_stream_audio(hass, hass_client):
|
||||||
|
"""
|
||||||
|
Test treatment of different audio inputs.
|
||||||
|
|
||||||
|
Record stream output should have an audio channel when input has
|
||||||
|
a valid codec and audio packets and no audio channel otherwise.
|
||||||
|
"""
|
||||||
|
await async_setup_component(hass, "stream", {"stream": {}})
|
||||||
|
|
||||||
|
for a_codec, expected_audio_streams in (
|
||||||
|
("aac", 1), # aac is a valid mp4 codec
|
||||||
|
("pcm_mulaw", 0), # G.711 is not a valid mp4 codec
|
||||||
|
("empty", 0), # audio stream with no packets
|
||||||
|
(None, 0), # no audio stream
|
||||||
|
):
|
||||||
|
with patch("homeassistant.components.stream.recorder.recorder_save_worker"):
|
||||||
|
# Setup demo track
|
||||||
|
source = generate_h264_video(
|
||||||
|
container_format="mov", audio_codec=a_codec
|
||||||
|
) # mov can store PCM
|
||||||
|
stream = preload_stream(hass, source)
|
||||||
|
recorder = stream.add_provider("recorder")
|
||||||
|
stream.start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
segment = await recorder.recv()
|
||||||
|
if not segment:
|
||||||
|
break
|
||||||
|
last_segment = segment
|
||||||
|
|
||||||
|
result = av.open(last_segment.segment, "r", format="mp4")
|
||||||
|
|
||||||
|
assert len(result.streams.audio) == expected_audio_streams
|
||||||
|
result.close()
|
||||||
|
stream.stop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user