Change stream sequence number to start from 0 (#51101)

* Use constants for provider strings

* Add last_sequence property
This commit is contained in:
uvjustin 2021-05-27 11:22:31 +08:00 committed by GitHub
parent f45bc3abc7
commit 38e0cbe964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 87 additions and 58 deletions

View File

@ -31,8 +31,10 @@ from .const import (
ATTR_ENDPOINTS,
ATTR_STREAMS,
DOMAIN,
HLS_PROVIDER,
MAX_SEGMENTS,
OUTPUT_IDLE_TIMEOUT,
RECORDER_PROVIDER,
STREAM_RESTART_INCREMENT,
STREAM_RESTART_RESET_TIME,
)
@ -90,7 +92,7 @@ async def async_setup(hass, config):
# Setup HLS
hls_endpoint = async_setup_hls(hass)
hass.data[DOMAIN][ATTR_ENDPOINTS]["hls"] = hls_endpoint
hass.data[DOMAIN][ATTR_ENDPOINTS][HLS_PROVIDER] = hls_endpoint
# Setup Recorder
async_setup_recorder(hass)
@ -146,7 +148,9 @@ class Stream:
@callback
def idle_callback():
if (not self.keepalive or fmt == "recorder") and fmt in self._outputs:
if (
not self.keepalive or fmt == RECORDER_PROVIDER
) and fmt in self._outputs:
self.remove_provider(self._outputs[fmt])
self.check_idle()
@ -259,19 +263,19 @@ class Stream:
raise HomeAssistantError(f"Can't write {video_path}, no access to path!")
# Add recorder
recorder = self.outputs().get("recorder")
recorder = self.outputs().get(RECORDER_PROVIDER)
if recorder:
raise HomeAssistantError(
f"Stream already recording to {recorder.video_path}!"
)
recorder = self.add_provider("recorder", timeout=duration)
recorder = self.add_provider(RECORDER_PROVIDER, timeout=duration)
recorder.video_path = video_path
self.start()
_LOGGER.debug("Started a stream recording of %s seconds", duration)
# Take advantage of lookback
hls = self.outputs().get("hls")
hls = self.outputs().get(HLS_PROVIDER)
if lookback > 0 and hls:
num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS)
# Wait for latest segment, then add the lookback

View File

@ -4,13 +4,16 @@ DOMAIN = "stream"
ATTR_ENDPOINTS = "endpoints"
ATTR_STREAMS = "streams"
OUTPUT_FORMATS = ["hls"]
HLS_PROVIDER = "hls"
RECORDER_PROVIDER = "recorder"
OUTPUT_FORMATS = [HLS_PROVIDER]
SEGMENT_CONTAINER_FORMAT = "mp4" # format for segments
RECORDER_CONTAINER_FORMAT = "mp4" # format for recorder output
AUDIO_CODECS = {"aac", "mp3"}
FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"}
FORMAT_CONTENT_TYPE = {HLS_PROVIDER: "application/vnd.apple.mpegurl"}
OUTPUT_IDLE_TIMEOUT = 300 # Idle timeout due to inactivity

View File

@ -97,6 +97,13 @@ class StreamOutput:
"""Return True if the output is idle."""
return self._idle_timer.idle
@property
def last_sequence(self) -> int:
"""Return the last sequence number without iterating."""
if self._segments:
return self._segments[-1].sequence
return -1
@property
def segments(self) -> list[int]:
"""Return current sequence from segments."""
@ -127,8 +134,7 @@ class StreamOutput:
async def recv(self) -> Segment | None:
"""Wait for and retrieve the latest segment."""
last_segment = max(self.segments, default=0)
if self._cursor is None or self._cursor <= last_segment:
if self._cursor is None or self._cursor <= self.last_sequence:
await self._event.wait()
if not self._segments:

View File

@ -3,7 +3,12 @@ from aiohttp import web
from homeassistant.core import callback
from .const import FORMAT_CONTENT_TYPE, MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS
from .const import (
FORMAT_CONTENT_TYPE,
HLS_PROVIDER,
MAX_SEGMENTS,
NUM_PLAYLIST_SEGMENTS,
)
from .core import PROVIDERS, HomeAssistant, IdleTimer, StreamOutput, StreamView
from .fmp4utils import get_codec_string
@ -45,12 +50,12 @@ class HlsMasterPlaylistView(StreamView):
async def handle(self, request, stream, sequence):
"""Return m3u8 playlist."""
track = stream.add_provider("hls")
track = stream.add_provider(HLS_PROVIDER)
stream.start()
# Wait for a segment to be ready
if not track.segments and not await track.recv():
return web.HTTPNotFound()
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]}
return web.Response(body=self.render(track).encode("utf-8"), headers=headers)
@ -104,12 +109,12 @@ class HlsPlaylistView(StreamView):
async def handle(self, request, stream, sequence):
"""Return m3u8 playlist."""
track = stream.add_provider("hls")
track = stream.add_provider(HLS_PROVIDER)
stream.start()
# Wait for a segment to be ready
if not track.segments and not await track.recv():
return web.HTTPNotFound()
headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]}
headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]}
return web.Response(body=self.render(track).encode("utf-8"), headers=headers)
@ -122,7 +127,7 @@ class HlsInitView(StreamView):
async def handle(self, request, stream, sequence):
"""Return init.mp4."""
track = stream.add_provider("hls")
track = stream.add_provider(HLS_PROVIDER)
segments = track.get_segments()
if not segments:
return web.HTTPNotFound()
@ -139,7 +144,7 @@ class HlsSegmentView(StreamView):
async def handle(self, request, stream, sequence):
"""Return fmp4 segment."""
track = stream.add_provider("hls")
track = stream.add_provider(HLS_PROVIDER)
segment = track.get_segment(int(sequence))
if not segment:
return web.HTTPNotFound()
@ -150,7 +155,7 @@ class HlsSegmentView(StreamView):
)
@PROVIDERS.register("hls")
@PROVIDERS.register(HLS_PROVIDER)
class HlsStreamOutput(StreamOutput):
"""Represents HLS Output formats."""
@ -161,4 +166,4 @@ class HlsStreamOutput(StreamOutput):
@property
def name(self) -> str:
"""Return provider name."""
return "hls"
return HLS_PROVIDER

View File

@ -12,7 +12,11 @@ from av.container import OutputContainer
from homeassistant.core import HomeAssistant, callback
from .const import RECORDER_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT
from .const import (
RECORDER_CONTAINER_FORMAT,
RECORDER_PROVIDER,
SEGMENT_CONTAINER_FORMAT,
)
from .core import PROVIDERS, IdleTimer, Segment, StreamOutput
_LOGGER = logging.getLogger(__name__)
@ -110,7 +114,7 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]):
output.close()
@PROVIDERS.register("recorder")
@PROVIDERS.register(RECORDER_PROVIDER)
class RecorderOutput(StreamOutput):
"""Represents HLS Output formats."""
@ -122,7 +126,7 @@ class RecorderOutput(StreamOutput):
@property
def name(self) -> str:
"""Return provider name."""
return "recorder"
return RECORDER_PROVIDER
def prepend(self, segments: list[Segment]) -> None:
"""Prepend segments to existing list."""

View File

@ -32,7 +32,9 @@ class SegmentBuffer:
self._stream_id = 0
self._outputs_callback = outputs_callback
self._outputs: list[StreamOutput] = []
self._sequence = 0
# sequence gets incremented before the first segment so the first segment
# has a sequence number of 0.
self._sequence = -1
self._segment_start_pts = None
self._memory_file: BytesIO = cast(BytesIO, None)
self._av_output: av.container.OutputContainer = None

View File

@ -7,7 +7,11 @@ import av
import pytest
from homeassistant.components.stream import create_stream
from homeassistant.components.stream.const import MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS
from homeassistant.components.stream.const import (
HLS_PROVIDER,
MAX_SEGMENTS,
NUM_PLAYLIST_SEGMENTS,
)
from homeassistant.components.stream.core import Segment
from homeassistant.const import HTTP_NOT_FOUND
from homeassistant.setup import async_setup_component
@ -47,7 +51,7 @@ def hls_stream(hass, hass_client):
async def create_client_for_stream(stream):
http_client = await hass_client()
parsed_url = urlparse(stream.endpoint_url("hls"))
parsed_url = urlparse(stream.endpoint_url(HLS_PROVIDER))
return HlsClient(http_client, parsed_url)
return create_client_for_stream
@ -93,7 +97,7 @@ async def test_hls_stream(hass, hls_stream, stream_worker_sync):
stream = create_stream(hass, source)
# Request stream
stream.add_provider("hls")
stream.add_provider(HLS_PROVIDER)
stream.start()
hls_client = await hls_stream(stream)
@ -134,9 +138,9 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync):
stream = create_stream(hass, source)
# Request stream
stream.add_provider("hls")
stream.add_provider(HLS_PROVIDER)
stream.start()
url = stream.endpoint_url("hls")
url = stream.endpoint_url(HLS_PROVIDER)
http_client = await hass_client()
@ -176,7 +180,7 @@ async def test_stream_timeout_after_stop(hass, hass_client, stream_worker_sync):
stream = create_stream(hass, source)
# Request stream
stream.add_provider("hls")
stream.add_provider(HLS_PROVIDER)
stream.start()
stream_worker_sync.resume()
@ -196,7 +200,7 @@ async def test_stream_keepalive(hass):
# Setup demo HLS track
source = "test_stream_keepalive_source"
stream = create_stream(hass, source)
track = stream.add_provider("hls")
track = stream.add_provider(HLS_PROVIDER)
track.num_segments = 2
cur_time = 0
@ -231,7 +235,7 @@ async def test_hls_playlist_view_no_output(hass, hass_client, hls_stream):
await async_setup_component(hass, "stream", {"stream": {}})
stream = create_stream(hass, STREAM_SOURCE)
stream.add_provider("hls")
stream.add_provider(HLS_PROVIDER)
hls_client = await hls_stream(stream)
@ -246,7 +250,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync):
stream = create_stream(hass, STREAM_SOURCE)
stream_worker_sync.pause()
hls = stream.add_provider("hls")
hls = stream.add_provider(HLS_PROVIDER)
hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION))
await hass.async_block_till_done()
@ -275,7 +279,7 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync):
stream = create_stream(hass, STREAM_SOURCE)
stream_worker_sync.pause()
hls = stream.add_provider("hls")
hls = stream.add_provider(HLS_PROVIDER)
hls_client = await hls_stream(stream)
@ -316,7 +320,7 @@ async def test_hls_playlist_view_discontinuity(hass, hls_stream, stream_worker_s
stream = create_stream(hass, STREAM_SOURCE)
stream_worker_sync.pause()
hls = stream.add_provider("hls")
hls = stream.add_provider(HLS_PROVIDER)
hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0))
hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0))
@ -346,7 +350,7 @@ async def test_hls_max_segments_discontinuity(hass, hls_stream, stream_worker_sy
stream = create_stream(hass, STREAM_SOURCE)
stream_worker_sync.pause()
hls = stream.add_provider("hls")
hls = stream.add_provider(HLS_PROVIDER)
hls_client = await hls_stream(stream)

View File

@ -15,6 +15,7 @@ import av
import pytest
from homeassistant.components.stream import create_stream
from homeassistant.components.stream.const import HLS_PROVIDER, RECORDER_PROVIDER
from homeassistant.components.stream.core import Segment
from homeassistant.components.stream.fmp4utils import get_init_and_moof_data
from homeassistant.components.stream.recorder import recorder_save_worker
@ -119,7 +120,7 @@ async def test_record_lookback(
stream = create_stream(hass, source)
# Start an HLS feed to enable lookback
stream.add_provider("hls")
stream.add_provider(HLS_PROVIDER)
stream.start()
with patch.object(hass.config, "is_allowed_path", return_value=True):
@ -148,7 +149,7 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync):
stream = create_stream(hass, source)
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder = stream.add_provider("recorder")
recorder = stream.add_provider(RECORDER_PROVIDER)
await recorder.recv()
@ -252,7 +253,7 @@ async def test_record_stream_audio(
stream = create_stream(hass, source)
with patch.object(hass.config, "is_allowed_path", return_value=True):
await stream.async_record("/example/path")
recorder = stream.add_provider("recorder")
recorder = stream.add_provider(RECORDER_PROVIDER)
while True:
segment = await recorder.recv()

View File

@ -23,6 +23,7 @@ import av
from homeassistant.components.stream import Stream
from homeassistant.components.stream.const import (
HLS_PROVIDER,
MAX_MISSING_DTS,
MIN_SEGMENT_DURATION,
PACKETS_TO_WAIT_FOR_AUDIO,
@ -31,7 +32,6 @@ from homeassistant.components.stream.worker import SegmentBuffer, stream_worker
STREAM_SOURCE = "some-stream-source"
# Formats here are arbitrary, not exercised by tests
STREAM_OUTPUT_FORMAT = "hls"
AUDIO_STREAM_FORMAT = "mp3"
VIDEO_STREAM_FORMAT = "h264"
VIDEO_FRAME_RATE = 12
@ -198,7 +198,7 @@ class MockPyAv:
async def async_decode_stream(hass, packets, py_av=None):
"""Start a stream worker that decodes incoming stream packets into output segments."""
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
stream.add_provider(HLS_PROVIDER)
if not py_av:
py_av = MockPyAv()
@ -218,7 +218,7 @@ async def async_decode_stream(hass, packets, py_av=None):
async def test_stream_open_fails(hass):
"""Test failure on stream open."""
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
stream.add_provider(HLS_PROVIDER)
with patch("av.open") as av_open:
av_open.side_effect = av.error.InvalidDataError(-2, "error")
segment_buffer = SegmentBuffer(stream.outputs)
@ -237,9 +237,9 @@ async def test_stream_worker_success(hass):
# segment arrives, hence the subtraction of one from the sequence length.
assert len(segments) == int((TEST_SEQUENCE_LENGTH - 1) * SEGMENTS_PER_PACKET)
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == TEST_SEQUENCE_LENGTH
assert len(decoded_stream.audio_packets) == 0
@ -253,7 +253,7 @@ async def test_skip_out_of_order_packet(hass):
decoded_stream = await async_decode_stream(hass, iter(packets))
segments = decoded_stream.segments
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# If skipped packet would have been the first packet of a segment, the previous
# segment will be longer by a packet duration
# We also may possibly lose a segment due to the shifting pts boundary
@ -273,7 +273,7 @@ async def test_skip_out_of_order_packet(hass):
# Check number of segments
assert len(segments) == int((len(packets) - 1) * SEGMENTS_PER_PACKET)
# Check remaining segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == len(packets) - 1
assert len(decoded_stream.audio_packets) == 0
@ -290,9 +290,9 @@ async def test_discard_old_packets(hass):
# Check number of segments
assert len(segments) == int((OUT_OF_ORDER_PACKET_INDEX - 1) * SEGMENTS_PER_PACKET)
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == OUT_OF_ORDER_PACKET_INDEX
assert len(decoded_stream.audio_packets) == 0
@ -309,9 +309,9 @@ async def test_packet_overflow(hass):
# Check number of segments
assert len(segments) == int((OUT_OF_ORDER_PACKET_INDEX - 1) * SEGMENTS_PER_PACKET)
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == OUT_OF_ORDER_PACKET_INDEX
assert len(decoded_stream.audio_packets) == 0
@ -332,9 +332,9 @@ async def test_skip_initial_bad_packets(hass):
(num_packets - num_bad_packets - 1) * SEGMENTS_PER_PACKET
)
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == num_packets - num_bad_packets
assert len(decoded_stream.audio_packets) == 0
@ -368,8 +368,8 @@ async def test_skip_missing_dts(hass):
decoded_stream = await async_decode_stream(hass, iter(packets))
segments = decoded_stream.segments
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
# Check segment durations (not counting the elongated segment)
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations (not counting the last segment)
assert (
sum([segments[i].duration == SEGMENT_DURATION for i in range(len(segments))])
>= len(segments) - 1
@ -495,9 +495,9 @@ async def test_pts_out_of_order(hass):
# Check number of segments
assert len(segments) == int((TEST_SEQUENCE_LENGTH - 1) * SEGMENTS_PER_PACKET)
# Check sequence numbers
assert all([segments[i].sequence == i + 1 for i in range(len(segments))])
assert all(segments[i].sequence == i for i in range(len(segments)))
# Check segment durations
assert all([s.duration == SEGMENT_DURATION for s in segments])
assert all(s.duration == SEGMENT_DURATION for s in segments)
assert len(decoded_stream.video_packets) == len(packets)
assert len(decoded_stream.audio_packets) == 0
@ -512,7 +512,7 @@ async def test_stream_stopped_while_decoding(hass):
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
stream.add_provider(HLS_PROVIDER)
py_av = MockPyAv()
py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH)
@ -539,7 +539,7 @@ async def test_update_stream_source(hass):
worker_wake = threading.Event()
stream = Stream(hass, STREAM_SOURCE)
stream.add_provider(STREAM_OUTPUT_FORMAT)
stream.add_provider(HLS_PROVIDER)
# Note that keepalive is not set here. The stream is "restarted" even though
# it is not stopping due to failure.
@ -572,14 +572,14 @@ async def test_update_stream_source(hass):
assert last_stream_source == STREAM_SOURCE + "-updated-source"
worker_wake.set()
# Ccleanup
# Cleanup
stream.stop()
async def test_worker_log(hass, caplog):
"""Test that the worker logs the url without username and password."""
stream = Stream(hass, "https://abcd:efgh@foo.bar")
stream.add_provider(STREAM_OUTPUT_FORMAT)
stream.add_provider(HLS_PROVIDER)
with patch("av.open") as av_open:
av_open.side_effect = av.error.InvalidDataError(-2, "error")
segment_buffer = SegmentBuffer(stream.outputs)