diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 67bfe404d7d..f71e725fcb4 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -31,8 +31,10 @@ from .const import ( ATTR_ENDPOINTS, ATTR_STREAMS, DOMAIN, + HLS_PROVIDER, MAX_SEGMENTS, OUTPUT_IDLE_TIMEOUT, + RECORDER_PROVIDER, STREAM_RESTART_INCREMENT, STREAM_RESTART_RESET_TIME, ) @@ -90,7 +92,7 @@ async def async_setup(hass, config): # Setup HLS hls_endpoint = async_setup_hls(hass) - hass.data[DOMAIN][ATTR_ENDPOINTS]["hls"] = hls_endpoint + hass.data[DOMAIN][ATTR_ENDPOINTS][HLS_PROVIDER] = hls_endpoint # Setup Recorder async_setup_recorder(hass) @@ -146,7 +148,9 @@ class Stream: @callback def idle_callback(): - if (not self.keepalive or fmt == "recorder") and fmt in self._outputs: + if ( + not self.keepalive or fmt == RECORDER_PROVIDER + ) and fmt in self._outputs: self.remove_provider(self._outputs[fmt]) self.check_idle() @@ -259,19 +263,19 @@ class Stream: raise HomeAssistantError(f"Can't write {video_path}, no access to path!") # Add recorder - recorder = self.outputs().get("recorder") + recorder = self.outputs().get(RECORDER_PROVIDER) if recorder: raise HomeAssistantError( f"Stream already recording to {recorder.video_path}!" ) - recorder = self.add_provider("recorder", timeout=duration) + recorder = self.add_provider(RECORDER_PROVIDER, timeout=duration) recorder.video_path = video_path self.start() _LOGGER.debug("Started a stream recording of %s seconds", duration) # Take advantage of lookback - hls = self.outputs().get("hls") + hls = self.outputs().get(HLS_PROVIDER) if lookback > 0 and hls: num_segments = min(int(lookback // hls.target_duration), MAX_SEGMENTS) # Wait for latest segment, then add the lookback diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index a2557286cf1..e1b1b610e03 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -4,13 +4,16 @@ DOMAIN = "stream" ATTR_ENDPOINTS = "endpoints" ATTR_STREAMS = "streams" -OUTPUT_FORMATS = ["hls"] +HLS_PROVIDER = "hls" +RECORDER_PROVIDER = "recorder" + +OUTPUT_FORMATS = [HLS_PROVIDER] SEGMENT_CONTAINER_FORMAT = "mp4" # format for segments RECORDER_CONTAINER_FORMAT = "mp4" # format for recorder output AUDIO_CODECS = {"aac", "mp3"} -FORMAT_CONTENT_TYPE = {"hls": "application/vnd.apple.mpegurl"} +FORMAT_CONTENT_TYPE = {HLS_PROVIDER: "application/vnd.apple.mpegurl"} OUTPUT_IDLE_TIMEOUT = 300 # Idle timeout due to inactivity diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index 695f1d05ac3..70cd5b2eba8 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -97,6 +97,13 @@ class StreamOutput: """Return True if the output is idle.""" return self._idle_timer.idle + @property + def last_sequence(self) -> int: + """Return the last sequence number without iterating.""" + if self._segments: + return self._segments[-1].sequence + return -1 + @property def segments(self) -> list[int]: """Return current sequence from segments.""" @@ -127,8 +134,7 @@ class StreamOutput: async def recv(self) -> Segment | None: """Wait for and retrieve the latest segment.""" - last_segment = max(self.segments, default=0) - if self._cursor is None or self._cursor <= last_segment: + if self._cursor is None or self._cursor <= self.last_sequence: await self._event.wait() if not self._segments: diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index 941f4407423..4968c935d72 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -3,7 +3,12 @@ from aiohttp import web from homeassistant.core import callback -from .const import FORMAT_CONTENT_TYPE, MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS +from .const import ( + FORMAT_CONTENT_TYPE, + HLS_PROVIDER, + MAX_SEGMENTS, + NUM_PLAYLIST_SEGMENTS, +) from .core import PROVIDERS, HomeAssistant, IdleTimer, StreamOutput, StreamView from .fmp4utils import get_codec_string @@ -45,12 +50,12 @@ class HlsMasterPlaylistView(StreamView): async def handle(self, request, stream, sequence): """Return m3u8 playlist.""" - track = stream.add_provider("hls") + track = stream.add_provider(HLS_PROVIDER) stream.start() # Wait for a segment to be ready if not track.segments and not await track.recv(): return web.HTTPNotFound() - headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]} + headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]} return web.Response(body=self.render(track).encode("utf-8"), headers=headers) @@ -104,12 +109,12 @@ class HlsPlaylistView(StreamView): async def handle(self, request, stream, sequence): """Return m3u8 playlist.""" - track = stream.add_provider("hls") + track = stream.add_provider(HLS_PROVIDER) stream.start() # Wait for a segment to be ready if not track.segments and not await track.recv(): return web.HTTPNotFound() - headers = {"Content-Type": FORMAT_CONTENT_TYPE["hls"]} + headers = {"Content-Type": FORMAT_CONTENT_TYPE[HLS_PROVIDER]} return web.Response(body=self.render(track).encode("utf-8"), headers=headers) @@ -122,7 +127,7 @@ class HlsInitView(StreamView): async def handle(self, request, stream, sequence): """Return init.mp4.""" - track = stream.add_provider("hls") + track = stream.add_provider(HLS_PROVIDER) segments = track.get_segments() if not segments: return web.HTTPNotFound() @@ -139,7 +144,7 @@ class HlsSegmentView(StreamView): async def handle(self, request, stream, sequence): """Return fmp4 segment.""" - track = stream.add_provider("hls") + track = stream.add_provider(HLS_PROVIDER) segment = track.get_segment(int(sequence)) if not segment: return web.HTTPNotFound() @@ -150,7 +155,7 @@ class HlsSegmentView(StreamView): ) -@PROVIDERS.register("hls") +@PROVIDERS.register(HLS_PROVIDER) class HlsStreamOutput(StreamOutput): """Represents HLS Output formats.""" @@ -161,4 +166,4 @@ class HlsStreamOutput(StreamOutput): @property def name(self) -> str: """Return provider name.""" - return "hls" + return HLS_PROVIDER diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index 7d849375ece..ac5f102e625 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -12,7 +12,11 @@ from av.container import OutputContainer from homeassistant.core import HomeAssistant, callback -from .const import RECORDER_CONTAINER_FORMAT, SEGMENT_CONTAINER_FORMAT +from .const import ( + RECORDER_CONTAINER_FORMAT, + RECORDER_PROVIDER, + SEGMENT_CONTAINER_FORMAT, +) from .core import PROVIDERS, IdleTimer, Segment, StreamOutput _LOGGER = logging.getLogger(__name__) @@ -110,7 +114,7 @@ def recorder_save_worker(file_out: str, segments: deque[Segment]): output.close() -@PROVIDERS.register("recorder") +@PROVIDERS.register(RECORDER_PROVIDER) class RecorderOutput(StreamOutput): """Represents HLS Output formats.""" @@ -122,7 +126,7 @@ class RecorderOutput(StreamOutput): @property def name(self) -> str: """Return provider name.""" - return "recorder" + return RECORDER_PROVIDER def prepend(self, segments: list[Segment]) -> None: """Prepend segments to existing list.""" diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index cb6d6a6a017..6fe339c5dea 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -32,7 +32,9 @@ class SegmentBuffer: self._stream_id = 0 self._outputs_callback = outputs_callback self._outputs: list[StreamOutput] = [] - self._sequence = 0 + # sequence gets incremented before the first segment so the first segment + # has a sequence number of 0. + self._sequence = -1 self._segment_start_pts = None self._memory_file: BytesIO = cast(BytesIO, None) self._av_output: av.container.OutputContainer = None diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index f9b96a662d9..3e4b81bcc25 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -7,7 +7,11 @@ import av import pytest from homeassistant.components.stream import create_stream -from homeassistant.components.stream.const import MAX_SEGMENTS, NUM_PLAYLIST_SEGMENTS +from homeassistant.components.stream.const import ( + HLS_PROVIDER, + MAX_SEGMENTS, + NUM_PLAYLIST_SEGMENTS, +) from homeassistant.components.stream.core import Segment from homeassistant.const import HTTP_NOT_FOUND from homeassistant.setup import async_setup_component @@ -47,7 +51,7 @@ def hls_stream(hass, hass_client): async def create_client_for_stream(stream): http_client = await hass_client() - parsed_url = urlparse(stream.endpoint_url("hls")) + parsed_url = urlparse(stream.endpoint_url(HLS_PROVIDER)) return HlsClient(http_client, parsed_url) return create_client_for_stream @@ -93,7 +97,7 @@ async def test_hls_stream(hass, hls_stream, stream_worker_sync): stream = create_stream(hass, source) # Request stream - stream.add_provider("hls") + stream.add_provider(HLS_PROVIDER) stream.start() hls_client = await hls_stream(stream) @@ -134,9 +138,9 @@ async def test_stream_timeout(hass, hass_client, stream_worker_sync): stream = create_stream(hass, source) # Request stream - stream.add_provider("hls") + stream.add_provider(HLS_PROVIDER) stream.start() - url = stream.endpoint_url("hls") + url = stream.endpoint_url(HLS_PROVIDER) http_client = await hass_client() @@ -176,7 +180,7 @@ async def test_stream_timeout_after_stop(hass, hass_client, stream_worker_sync): stream = create_stream(hass, source) # Request stream - stream.add_provider("hls") + stream.add_provider(HLS_PROVIDER) stream.start() stream_worker_sync.resume() @@ -196,7 +200,7 @@ async def test_stream_keepalive(hass): # Setup demo HLS track source = "test_stream_keepalive_source" stream = create_stream(hass, source) - track = stream.add_provider("hls") + track = stream.add_provider(HLS_PROVIDER) track.num_segments = 2 cur_time = 0 @@ -231,7 +235,7 @@ async def test_hls_playlist_view_no_output(hass, hass_client, hls_stream): await async_setup_component(hass, "stream", {"stream": {}}) stream = create_stream(hass, STREAM_SOURCE) - stream.add_provider("hls") + stream.add_provider(HLS_PROVIDER) hls_client = await hls_stream(stream) @@ -246,7 +250,7 @@ async def test_hls_playlist_view(hass, hls_stream, stream_worker_sync): stream = create_stream(hass, STREAM_SOURCE) stream_worker_sync.pause() - hls = stream.add_provider("hls") + hls = stream.add_provider(HLS_PROVIDER) hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION)) await hass.async_block_till_done() @@ -275,7 +279,7 @@ async def test_hls_max_segments(hass, hls_stream, stream_worker_sync): stream = create_stream(hass, STREAM_SOURCE) stream_worker_sync.pause() - hls = stream.add_provider("hls") + hls = stream.add_provider(HLS_PROVIDER) hls_client = await hls_stream(stream) @@ -316,7 +320,7 @@ async def test_hls_playlist_view_discontinuity(hass, hls_stream, stream_worker_s stream = create_stream(hass, STREAM_SOURCE) stream_worker_sync.pause() - hls = stream.add_provider("hls") + hls = stream.add_provider(HLS_PROVIDER) hls.put(Segment(1, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) hls.put(Segment(2, INIT_BYTES, MOOF_BYTES, DURATION, stream_id=0)) @@ -346,7 +350,7 @@ async def test_hls_max_segments_discontinuity(hass, hls_stream, stream_worker_sy stream = create_stream(hass, STREAM_SOURCE) stream_worker_sync.pause() - hls = stream.add_provider("hls") + hls = stream.add_provider(HLS_PROVIDER) hls_client = await hls_stream(stream) diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 9097d03a7a9..216e02a95b9 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -15,6 +15,7 @@ import av import pytest from homeassistant.components.stream import create_stream +from homeassistant.components.stream.const import HLS_PROVIDER, RECORDER_PROVIDER from homeassistant.components.stream.core import Segment from homeassistant.components.stream.fmp4utils import get_init_and_moof_data from homeassistant.components.stream.recorder import recorder_save_worker @@ -119,7 +120,7 @@ async def test_record_lookback( stream = create_stream(hass, source) # Start an HLS feed to enable lookback - stream.add_provider("hls") + stream.add_provider(HLS_PROVIDER) stream.start() with patch.object(hass.config, "is_allowed_path", return_value=True): @@ -148,7 +149,7 @@ async def test_recorder_timeout(hass, hass_client, stream_worker_sync): stream = create_stream(hass, source) with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - recorder = stream.add_provider("recorder") + recorder = stream.add_provider(RECORDER_PROVIDER) await recorder.recv() @@ -252,7 +253,7 @@ async def test_record_stream_audio( stream = create_stream(hass, source) with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") - recorder = stream.add_provider("recorder") + recorder = stream.add_provider(RECORDER_PROVIDER) while True: segment = await recorder.recv() diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index d5527105a70..501ea302172 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -23,6 +23,7 @@ import av from homeassistant.components.stream import Stream from homeassistant.components.stream.const import ( + HLS_PROVIDER, MAX_MISSING_DTS, MIN_SEGMENT_DURATION, PACKETS_TO_WAIT_FOR_AUDIO, @@ -31,7 +32,6 @@ from homeassistant.components.stream.worker import SegmentBuffer, stream_worker STREAM_SOURCE = "some-stream-source" # Formats here are arbitrary, not exercised by tests -STREAM_OUTPUT_FORMAT = "hls" AUDIO_STREAM_FORMAT = "mp3" VIDEO_STREAM_FORMAT = "h264" VIDEO_FRAME_RATE = 12 @@ -198,7 +198,7 @@ class MockPyAv: async def async_decode_stream(hass, packets, py_av=None): """Start a stream worker that decodes incoming stream packets into output segments.""" stream = Stream(hass, STREAM_SOURCE) - stream.add_provider(STREAM_OUTPUT_FORMAT) + stream.add_provider(HLS_PROVIDER) if not py_av: py_av = MockPyAv() @@ -218,7 +218,7 @@ async def async_decode_stream(hass, packets, py_av=None): async def test_stream_open_fails(hass): """Test failure on stream open.""" stream = Stream(hass, STREAM_SOURCE) - stream.add_provider(STREAM_OUTPUT_FORMAT) + stream.add_provider(HLS_PROVIDER) with patch("av.open") as av_open: av_open.side_effect = av.error.InvalidDataError(-2, "error") segment_buffer = SegmentBuffer(stream.outputs) @@ -237,9 +237,9 @@ async def test_stream_worker_success(hass): # segment arrives, hence the subtraction of one from the sequence length. assert len(segments) == int((TEST_SEQUENCE_LENGTH - 1) * SEGMENTS_PER_PACKET) # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # Check segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == TEST_SEQUENCE_LENGTH assert len(decoded_stream.audio_packets) == 0 @@ -253,7 +253,7 @@ async def test_skip_out_of_order_packet(hass): decoded_stream = await async_decode_stream(hass, iter(packets)) segments = decoded_stream.segments # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # If skipped packet would have been the first packet of a segment, the previous # segment will be longer by a packet duration # We also may possibly lose a segment due to the shifting pts boundary @@ -273,7 +273,7 @@ async def test_skip_out_of_order_packet(hass): # Check number of segments assert len(segments) == int((len(packets) - 1) * SEGMENTS_PER_PACKET) # Check remaining segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == len(packets) - 1 assert len(decoded_stream.audio_packets) == 0 @@ -290,9 +290,9 @@ async def test_discard_old_packets(hass): # Check number of segments assert len(segments) == int((OUT_OF_ORDER_PACKET_INDEX - 1) * SEGMENTS_PER_PACKET) # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # Check segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == OUT_OF_ORDER_PACKET_INDEX assert len(decoded_stream.audio_packets) == 0 @@ -309,9 +309,9 @@ async def test_packet_overflow(hass): # Check number of segments assert len(segments) == int((OUT_OF_ORDER_PACKET_INDEX - 1) * SEGMENTS_PER_PACKET) # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # Check segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == OUT_OF_ORDER_PACKET_INDEX assert len(decoded_stream.audio_packets) == 0 @@ -332,9 +332,9 @@ async def test_skip_initial_bad_packets(hass): (num_packets - num_bad_packets - 1) * SEGMENTS_PER_PACKET ) # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # Check segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == num_packets - num_bad_packets assert len(decoded_stream.audio_packets) == 0 @@ -368,8 +368,8 @@ async def test_skip_missing_dts(hass): decoded_stream = await async_decode_stream(hass, iter(packets)) segments = decoded_stream.segments # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) - # Check segment durations (not counting the elongated segment) + assert all(segments[i].sequence == i for i in range(len(segments))) + # Check segment durations (not counting the last segment) assert ( sum([segments[i].duration == SEGMENT_DURATION for i in range(len(segments))]) >= len(segments) - 1 @@ -495,9 +495,9 @@ async def test_pts_out_of_order(hass): # Check number of segments assert len(segments) == int((TEST_SEQUENCE_LENGTH - 1) * SEGMENTS_PER_PACKET) # Check sequence numbers - assert all([segments[i].sequence == i + 1 for i in range(len(segments))]) + assert all(segments[i].sequence == i for i in range(len(segments))) # Check segment durations - assert all([s.duration == SEGMENT_DURATION for s in segments]) + assert all(s.duration == SEGMENT_DURATION for s in segments) assert len(decoded_stream.video_packets) == len(packets) assert len(decoded_stream.audio_packets) == 0 @@ -512,7 +512,7 @@ async def test_stream_stopped_while_decoding(hass): worker_wake = threading.Event() stream = Stream(hass, STREAM_SOURCE) - stream.add_provider(STREAM_OUTPUT_FORMAT) + stream.add_provider(HLS_PROVIDER) py_av = MockPyAv() py_av.container.packets = PacketSequence(TEST_SEQUENCE_LENGTH) @@ -539,7 +539,7 @@ async def test_update_stream_source(hass): worker_wake = threading.Event() stream = Stream(hass, STREAM_SOURCE) - stream.add_provider(STREAM_OUTPUT_FORMAT) + stream.add_provider(HLS_PROVIDER) # Note that keepalive is not set here. The stream is "restarted" even though # it is not stopping due to failure. @@ -572,14 +572,14 @@ async def test_update_stream_source(hass): assert last_stream_source == STREAM_SOURCE + "-updated-source" worker_wake.set() - # Ccleanup + # Cleanup stream.stop() async def test_worker_log(hass, caplog): """Test that the worker logs the url without username and password.""" stream = Stream(hass, "https://abcd:efgh@foo.bar") - stream.add_provider(STREAM_OUTPUT_FORMAT) + stream.add_provider(HLS_PROVIDER) with patch("av.open") as av_open: av_open.side_effect = av.error.InvalidDataError(-2, "error") segment_buffer = SegmentBuffer(stream.outputs)