diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 3766d981da5..b842eb7fb78 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -502,7 +502,6 @@ class Stream: recorder.video_path = video_path await self.start() - self._logger.debug("Started a stream recording of %s seconds", duration) # Take advantage of lookback hls: HlsStreamOutput = cast(HlsStreamOutput, self.outputs().get(HLS_PROVIDER)) @@ -512,6 +511,9 @@ class Stream: await hls.recv() recorder.prepend(list(hls.get_segments())[-num_segments - 1 : -1]) + self._logger.debug("Started a stream recording of %s seconds", duration) + await recorder.async_record() + async def async_get_image( self, width: int | None = None, diff --git a/homeassistant/components/stream/core.py b/homeassistant/components/stream/core.py index c8d831157a8..09d9a9d5031 100644 --- a/homeassistant/components/stream/core.py +++ b/homeassistant/components/stream/core.py @@ -327,7 +327,6 @@ class StreamOutput: """Handle cleanup.""" self._event.set() self.idle_timer.clear() - self._segments = deque(maxlen=self._segments.maxlen) class StreamView(HomeAssistantView): diff --git a/homeassistant/components/stream/hls.py b/homeassistant/components/stream/hls.py index efecdcbe9dc..d3bcbb360a6 100644 --- a/homeassistant/components/stream/hls.py +++ b/homeassistant/components/stream/hls.py @@ -60,6 +60,11 @@ class HlsStreamOutput(StreamOutput): """Return provider name.""" return HLS_PROVIDER + def cleanup(self) -> None: + """Handle cleanup.""" + super().cleanup() + self._segments.clear() + @property def target_duration(self) -> float: """Return the target duration.""" diff --git a/homeassistant/components/stream/recorder.py b/homeassistant/components/stream/recorder.py index 4d97c0d683d..b33a5fbbf84 100644 --- a/homeassistant/components/stream/recorder.py +++ b/homeassistant/components/stream/recorder.py @@ -1,14 +1,11 @@ """Provide functionality to record stream.""" from __future__ import annotations -from collections import deque from io import BytesIO import logging import os -import threading import av -from av.container import OutputContainer from homeassistant.core import HomeAssistant, callback @@ -27,99 +24,9 @@ def async_setup_recorder(hass: HomeAssistant) -> None: """Only here so Provider Registry works.""" -def recorder_save_worker(file_out: str, segments: deque[Segment]) -> None: - """Handle saving stream.""" - - if not segments: - _LOGGER.error("Recording failed to capture anything") - return - - os.makedirs(os.path.dirname(file_out), exist_ok=True) - - pts_adjuster: dict[str, int | None] = {"video": None, "audio": None} - output: OutputContainer | None = None - output_v = None - output_a = None - - last_stream_id = None - # The running duration of processed segments. Note that this is in av.time_base - # units which seem to be defined inversely to how stream time_bases are defined - running_duration = 0 - - last_sequence = float("-inf") - for segment in segments: - # Because the stream_worker is in a different thread from the record service, - # the lookback segments may still have some overlap with the recorder segments - if segment.sequence <= last_sequence: - continue - last_sequence = segment.sequence - - # Open segment - source = av.open( - BytesIO(segment.init + segment.get_data()), - "r", - format=SEGMENT_CONTAINER_FORMAT, - ) - # Skip this segment if it doesn't have data - if source.duration is None: - source.close() - continue - source_v = source.streams.video[0] - source_a = source.streams.audio[0] if len(source.streams.audio) > 0 else None - - # Create output on first segment - if not output: - output = av.open( - file_out, - "w", - format=RECORDER_CONTAINER_FORMAT, - container_options={ - "video_track_timescale": str(int(1 / source_v.time_base)) - }, - ) - - # Add output streams if necessary - if not output_v: - output_v = output.add_stream(template=source_v) - context = output_v.codec_context - context.flags |= "GLOBAL_HEADER" - if source_a and not output_a: - output_a = output.add_stream(template=source_a) - - # Recalculate pts adjustments on first segment and on any discontinuity - # We are assuming time base is the same across all discontinuities - if last_stream_id != segment.stream_id: - last_stream_id = segment.stream_id - pts_adjuster["video"] = int( - (running_duration - source.start_time) - / (av.time_base * source_v.time_base) - ) - if source_a: - pts_adjuster["audio"] = int( - (running_duration - source.start_time) - / (av.time_base * source_a.time_base) - ) - - # Remux video - for packet in source.demux(): - if packet.dts is None: - continue - packet.pts += pts_adjuster[packet.stream.type] - packet.dts += pts_adjuster[packet.stream.type] - packet.stream = output_v if packet.stream.type == "video" else output_a - output.mux(packet) - - running_duration += source.duration - source.start_time - - source.close() - - if output is not None: - output.close() - - @PROVIDERS.register(RECORDER_PROVIDER) class RecorderOutput(StreamOutput): - """Represents HLS Output formats.""" + """Represents the Recorder Output format.""" def __init__( self, @@ -141,13 +48,119 @@ class RecorderOutput(StreamOutput): self._segments.extendleft(reversed(segments)) def cleanup(self) -> None: - """Write recording and clean up.""" - _LOGGER.debug("Starting recorder worker thread") - thread = threading.Thread( - name="recorder_save_worker", - target=recorder_save_worker, - args=(self.video_path, self._segments.copy()), - ) - thread.start() - + """Handle cleanup.""" + self.idle_timer.idle = True super().cleanup() + + async def async_record(self) -> None: + """Handle saving stream.""" + + os.makedirs(os.path.dirname(self.video_path), exist_ok=True) + + pts_adjuster: dict[str, int | None] = {"video": None, "audio": None} + output: av.container.OutputContainer | None = None + output_v = None + output_a = None + + last_stream_id = -1 + # The running duration of processed segments. Note that this is in av.time_base + # units which seem to be defined inversely to how stream time_bases are defined + running_duration = 0 + + last_sequence = float("-inf") + + def write_segment(segment: Segment) -> None: + """Write a segment to output.""" + nonlocal output, output_v, output_a, last_stream_id, running_duration, last_sequence + # Because the stream_worker is in a different thread from the record service, + # the lookback segments may still have some overlap with the recorder segments + if segment.sequence <= last_sequence: + return + last_sequence = segment.sequence + + # Open segment + source = av.open( + BytesIO(segment.init + segment.get_data()), + "r", + format=SEGMENT_CONTAINER_FORMAT, + ) + # Skip this segment if it doesn't have data + if source.duration is None: + source.close() + return + source_v = source.streams.video[0] + source_a = ( + source.streams.audio[0] if len(source.streams.audio) > 0 else None + ) + + # Create output on first segment + if not output: + output = av.open( + self.video_path + ".tmp", + "w", + format=RECORDER_CONTAINER_FORMAT, + container_options={ + "video_track_timescale": str(int(1 / source_v.time_base)) + }, + ) + + # Add output streams if necessary + if not output_v: + output_v = output.add_stream(template=source_v) + context = output_v.codec_context + context.flags |= "GLOBAL_HEADER" + if source_a and not output_a: + output_a = output.add_stream(template=source_a) + + # Recalculate pts adjustments on first segment and on any discontinuity + # We are assuming time base is the same across all discontinuities + if last_stream_id != segment.stream_id: + last_stream_id = segment.stream_id + pts_adjuster["video"] = int( + (running_duration - source.start_time) + / (av.time_base * source_v.time_base) + ) + if source_a: + pts_adjuster["audio"] = int( + (running_duration - source.start_time) + / (av.time_base * source_a.time_base) + ) + + # Remux video + for packet in source.demux(): + if packet.dts is None: + continue + packet.pts += pts_adjuster[packet.stream.type] + packet.dts += pts_adjuster[packet.stream.type] + packet.stream = output_v if packet.stream.type == "video" else output_a + output.mux(packet) + + running_duration += source.duration - source.start_time + + source.close() + + # Write lookback segments + while len(self._segments) > 1: # The last segment is in progress + await self._hass.async_add_executor_job( + write_segment, self._segments.popleft() + ) + # Make sure the first segment has been added + if not self._segments: + await self.recv() + # Write segments as soon as they are completed + while not self.idle: + await self.recv() + await self._hass.async_add_executor_job( + write_segment, self._segments.popleft() + ) + # Write remaining segments + # Should only have 0 or 1 segments, but loop through just in case + while self._segments: + await self._hass.async_add_executor_job( + write_segment, self._segments.popleft() + ) + if output is None: + _LOGGER.error("Recording failed to capture anything") + else: + output.close() + os.rename(self.video_path + ".tmp", self.video_path) diff --git a/tests/components/stream/conftest.py b/tests/components/stream/conftest.py index a3d2da8bd52..91b4106c1f4 100644 --- a/tests/components/stream/conftest.py +++ b/tests/components/stream/conftest.py @@ -12,7 +12,6 @@ so that it can inspect the output. from __future__ import annotations import asyncio -from collections import deque from http import HTTPStatus import logging import threading @@ -20,10 +19,9 @@ from typing import Generator from unittest.mock import Mock, patch from aiohttp import web -import async_timeout import pytest -from homeassistant.components.stream.core import Segment, StreamOutput +from homeassistant.components.stream.core import StreamOutput from homeassistant.components.stream.worker import StreamState from .common import generate_h264_video, stream_teardown @@ -73,61 +71,6 @@ def stream_worker_sync(hass): yield sync -class SaveRecordWorkerSync: - """ - Test fixture to manage RecordOutput thread for recorder_save_worker. - - This is used to assert that the worker is started and stopped cleanly - to avoid thread leaks in tests. - """ - - def __init__(self, hass): - """Initialize SaveRecordWorkerSync.""" - self._hass = hass - self._save_event = None - self._segments = None - self._save_thread = None - self.reset() - - def recorder_save_worker(self, file_out: str, segments: deque[Segment]): - """Mock method for patch.""" - logging.debug("recorder_save_worker thread started") - assert self._save_thread is None - self._segments = segments - self._save_thread = threading.current_thread() - self._hass.loop.call_soon_threadsafe(self._save_event.set) - - async def get_segments(self): - """Return the recorded video segments.""" - async with async_timeout.timeout(TEST_TIMEOUT): - await self._save_event.wait() - return self._segments - - async def join(self): - """Verify save worker was invoked and block on shutdown.""" - async with async_timeout.timeout(TEST_TIMEOUT): - await self._save_event.wait() - self._save_thread.join(timeout=TEST_TIMEOUT) - assert not self._save_thread.is_alive() - - def reset(self): - """Reset callback state for reuse in tests.""" - self._save_thread = None - self._save_event = asyncio.Event() - - -@pytest.fixture() -def record_worker_sync(hass): - """Patch recorder_save_worker for clean thread shutdown for test.""" - sync = SaveRecordWorkerSync(hass) - with patch( - "homeassistant.components.stream.recorder.recorder_save_worker", - side_effect=sync.recorder_save_worker, - autospec=True, - ): - yield sync - - class HLSSync: """Test fixture that intercepts stream worker calls to StreamOutput.""" diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index 7343b96ef9a..715e69fb889 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -506,10 +506,12 @@ async def test_remove_incomplete_segment_on_exit( assert len(segments) == 3 assert not segments[-1].complete stream_worker_sync.resume() - stream._thread_quit.set() - stream._thread.join() - stream._thread = None - await hass.async_block_till_done() - assert segments[-1].complete - assert len(segments) == 2 + with patch("homeassistant.components.stream.Stream.remove_provider"): + # Patch remove_provider so the deque is not cleared + stream._thread_quit.set() + stream._thread.join() + stream._thread = None + await hass.async_block_till_done() + assert segments[-1].complete + assert len(segments) == 2 await stream.stop() diff --git a/tests/components/stream/test_recorder.py b/tests/components/stream/test_recorder.py index 9433cbd449d..d7595b47679 100644 --- a/tests/components/stream/test_recorder.py +++ b/tests/components/stream/test_recorder.py @@ -1,4 +1,5 @@ -"""The tests for hls streams.""" +"""The tests for recording streams.""" +import asyncio from datetime import timedelta from io import BytesIO import os @@ -7,11 +8,14 @@ from unittest.mock import patch import av import pytest -from homeassistant.components.stream import create_stream -from homeassistant.components.stream.const import HLS_PROVIDER, RECORDER_PROVIDER +from homeassistant.components.stream import Stream, create_stream +from homeassistant.components.stream.const import ( + HLS_PROVIDER, + OUTPUT_IDLE_TIMEOUT, + RECORDER_PROVIDER, +) from homeassistant.components.stream.core import Part from homeassistant.components.stream.fmp4utils import find_box -from homeassistant.components.stream.recorder import recorder_save_worker from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component import homeassistant.util.dt as dt_util @@ -20,40 +24,55 @@ from .common import DefaultSegment as Segment, generate_h264_video, remux_with_a from tests.common import async_fire_time_changed -MAX_ABORT_SEGMENTS = 20 # Abort test to avoid looping forever - -async def test_record_stream(hass, hass_client, record_worker_sync, h264_video): - """ - Test record stream. - - Tests full integration with the stream component, and captures the - stream worker and save worker to allow for clean shutdown of background - threads. The actual save logic is tested in test_recorder_save below. - """ +@pytest.fixture(autouse=True) +async def stream_component(hass): + """Set up the component before each test.""" await async_setup_component(hass, "stream", {"stream": {}}) - # Setup demo track - stream = create_stream(hass, h264_video, {}) + +@pytest.fixture +def filename(tmpdir): + """Use this filename for the tests.""" + return f"{tmpdir}/test.mp4" + + +async def test_record_stream(hass, filename, h264_video): + """Test record stream.""" + + worker_finished = asyncio.Event() + + class MockStream(Stream): + """Mock Stream so we can patch remove_provider.""" + + async def remove_provider(self, provider): + """Add a finished event to Stream.remove_provider.""" + await Stream.remove_provider(self, provider) + worker_finished.set() + + with patch("homeassistant.components.stream.Stream", wraps=MockStream): + stream = create_stream(hass, h264_video, {}) + with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") + make_recording = hass.async_create_task(stream.async_record(filename)) - # After stream decoding finishes, the record worker thread starts - segments = await record_worker_sync.get_segments() - assert len(segments) >= 1 + # In general usage the recorder will only include what has already been + # processed by the worker. To guarantee we have some output for the test, + # wait until the worker has finished before firing + await worker_finished.wait() - # Verify that the save worker was invoked, then block until its - # thread completes and is shutdown completely to avoid thread leaks. - await record_worker_sync.join() + # Fire the IdleTimer + future = dt_util.utcnow() + timedelta(seconds=30) + async_fire_time_changed(hass, future) - await stream.stop() + await make_recording + + # Assert + assert os.path.exists(filename) -async def test_record_lookback( - hass, hass_client, stream_worker_sync, record_worker_sync, h264_video -): +async def test_record_lookback(hass, h264_video): """Exercise record with loopback.""" - await async_setup_component(hass, "stream", {"stream": {}}) stream = create_stream(hass, h264_video, {}) @@ -69,42 +88,8 @@ async def test_record_lookback( await stream.stop() -async def test_recorder_timeout(hass, hass_client, stream_worker_sync, h264_video): - """ - Test recorder timeout. - - Mocks out the cleanup to assert that it is invoked after a timeout. - This test does not start the recorder save thread. - """ - await async_setup_component(hass, "stream", {"stream": {}}) - - stream_worker_sync.pause() - - with patch("homeassistant.components.stream.IdleTimer.fire") as mock_timeout: - # Setup demo track - stream = create_stream(hass, h264_video, {}) - with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") - recorder = stream.add_provider(RECORDER_PROVIDER) - - await recorder.recv() - - # Wait a minute - future = dt_util.utcnow() + timedelta(minutes=1) - async_fire_time_changed(hass, future) - await hass.async_block_till_done() - - assert mock_timeout.called - - stream_worker_sync.resume() - await stream.stop() - await hass.async_block_till_done() - await hass.async_block_till_done() - - -async def test_record_path_not_allowed(hass, hass_client, h264_video): +async def test_record_path_not_allowed(hass, h264_video): """Test where the output path is not allowed by home assistant configuration.""" - await async_setup_component(hass, "stream", {"stream": {}}) stream = create_stream(hass, h264_video, {}) with patch.object( @@ -127,25 +112,8 @@ def add_parts_to_segment(segment, source): ] -async def test_recorder_save(tmpdir, h264_video): - """Test recorder save.""" - # Setup - filename = f"{tmpdir}/test.mp4" - - # Run - segment = Segment(sequence=1) - add_parts_to_segment(segment, h264_video) - segment.duration = 4 - recorder_save_worker(filename, [segment]) - - # Assert - assert os.path.exists(filename) - - -async def test_recorder_discontinuity(tmpdir, h264_video): +async def test_recorder_discontinuity(hass, filename, h264_video): """Test recorder save across a discontinuity.""" - # Setup - filename = f"{tmpdir}/test.mp4" # Run segment_1 = Segment(sequence=1, stream_id=0) @@ -154,18 +122,50 @@ async def test_recorder_discontinuity(tmpdir, h264_video): segment_2 = Segment(sequence=2, stream_id=1) add_parts_to_segment(segment_2, h264_video) segment_2.duration = 4 - recorder_save_worker(filename, [segment_1, segment_2]) + + provider_ready = asyncio.Event() + + class MockStream(Stream): + """Mock Stream so we can patch add_provider.""" + + async def start(self): + """Make Stream.start a noop that gives up async context.""" + await asyncio.sleep(0) + + def add_provider(self, fmt, timeout=OUTPUT_IDLE_TIMEOUT): + """Add a finished event to Stream.add_provider.""" + provider = Stream.add_provider(self, fmt, timeout) + provider_ready.set() + return provider + + with patch.object(hass.config, "is_allowed_path", return_value=True), patch( + "homeassistant.components.stream.Stream", wraps=MockStream + ), patch("homeassistant.components.stream.recorder.RecorderOutput.recv"): + stream = create_stream(hass, "blank", {}) + make_recording = hass.async_create_task(stream.async_record(filename)) + await provider_ready.wait() + + recorder_output = stream.outputs()[RECORDER_PROVIDER] + recorder_output.idle_timer.start() + recorder_output._segments.extend([segment_1, segment_2]) + + # Fire the IdleTimer + future = dt_util.utcnow() + timedelta(seconds=30) + async_fire_time_changed(hass, future) + + await make_recording # Assert assert os.path.exists(filename) -async def test_recorder_no_segments(tmpdir): +async def test_recorder_no_segments(hass, filename): """Test recorder behavior with a stream failure which causes no segments.""" - # Setup - filename = f"{tmpdir}/test.mp4" + + stream = create_stream(hass, BytesIO(), {}) # Run - recorder_save_worker("unused-file", []) + with patch.object(hass.config, "is_allowed_path", return_value=True): + await stream.async_record(filename) # Assert assert not os.path.exists(filename) @@ -188,9 +188,7 @@ def h264_mov_video(): ) async def test_record_stream_audio( hass, - hass_client, - stream_worker_sync, - record_worker_sync, + filename, audio_codec, expected_audio_streams, h264_mov_video, @@ -201,28 +199,42 @@ async def test_record_stream_audio( Record stream output should have an audio channel when input has a valid codec and audio packets and no audio channel otherwise. """ - await async_setup_component(hass, "stream", {"stream": {}}) # Remux source video with new audio source = remux_with_audio(h264_mov_video, "mov", audio_codec) # mov can store PCM - record_worker_sync.reset() - stream_worker_sync.pause() + worker_finished = asyncio.Event() + + class MockStream(Stream): + """Mock Stream so we can patch remove_provider.""" + + async def remove_provider(self, provider): + """Add a finished event to Stream.remove_provider.""" + await Stream.remove_provider(self, provider) + worker_finished.set() + + with patch("homeassistant.components.stream.Stream", wraps=MockStream): + stream = create_stream(hass, source, {}) - stream = create_stream(hass, source, {}) with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") - recorder = stream.add_provider(RECORDER_PROVIDER) + make_recording = hass.async_create_task(stream.async_record(filename)) - while True: - await recorder.recv() - if not (segment := recorder.last_segment): - break - last_segment = segment - stream_worker_sync.resume() + # In general usage the recorder will only include what has already been + # processed by the worker. To guarantee we have some output for the test, + # wait until the worker has finished before firing + await worker_finished.wait() + + # Fire the IdleTimer + future = dt_util.utcnow() + timedelta(seconds=30) + async_fire_time_changed(hass, future) + + await make_recording + + # Assert + assert os.path.exists(filename) result = av.open( - BytesIO(last_segment.init + last_segment.get_data()), + filename, "r", format="mp4", ) @@ -232,14 +244,9 @@ async def test_record_stream_audio( await stream.stop() await hass.async_block_till_done() - # Verify that the save worker was invoked, then block until its - # thread completes and is shutdown completely to avoid thread leaks. - await record_worker_sync.join() - async def test_recorder_log(hass, caplog): """Test starting a stream to record logs the url without username and password.""" - await async_setup_component(hass, "stream", {"stream": {}}) stream = create_stream(hass, "https://abcd:efgh@foo.bar", {}) with patch.object(hass.config, "is_allowed_path", return_value=True): await stream.async_record("/example/path") diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 298d7287e69..863a289c2c5 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -13,6 +13,7 @@ pushed to the output streams. The packet sequence can be used to exercise failure modes or corner cases like how out of order packets are handled. """ +import asyncio import fractions import io import logging @@ -33,6 +34,7 @@ from homeassistant.components.stream.const import ( HLS_PROVIDER, MAX_MISSING_DTS, PACKETS_TO_WAIT_FOR_AUDIO, + RECORDER_PROVIDER, SEGMENT_DURATION_ADJUSTER, TARGET_SEGMENT_DURATION_NON_LL_HLS, ) @@ -732,7 +734,23 @@ async def test_worker_log(hass, caplog): assert "https://abcd:efgh@foo.bar" not in caplog.text -async def test_durations(hass, record_worker_sync): +@pytest.fixture +def worker_finished_stream(): + """Fixture that helps call a stream and wait for the worker to finish.""" + worker_finished = asyncio.Event() + + class MockStream(Stream): + """Mock Stream so we can patch remove_provider.""" + + async def remove_provider(self, provider): + """Add a finished event to Stream.remove_provider.""" + await Stream.remove_provider(self, provider) + worker_finished.set() + + return worker_finished, MockStream + + +async def test_durations(hass, worker_finished_stream): """Test that the duration metadata matches the media.""" # Use a target part duration which has a slight mismatch @@ -751,13 +769,17 @@ async def test_durations(hass, record_worker_sync): ) source = generate_h264_video(duration=SEGMENT_DURATION + 1) - stream = create_stream(hass, source, {}, stream_label="camera") + worker_finished, mock_stream = worker_finished_stream - # use record_worker_sync to grab output segments - with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") + with patch("homeassistant.components.stream.Stream", wraps=mock_stream): + stream = create_stream(hass, source, {}, stream_label="camera") + + recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30) + await stream.start() + await worker_finished.wait() + + complete_segments = list(recorder_output.get_segments())[:-1] - complete_segments = list(await record_worker_sync.get_segments())[:-1] assert len(complete_segments) >= 1 # check that the Part duration metadata matches the durations in the media @@ -803,12 +825,10 @@ async def test_durations(hass, record_worker_sync): abs_tol=1e-6, ) - await record_worker_sync.join() - await stream.stop() -async def test_has_keyframe(hass, record_worker_sync, h264_video): +async def test_has_keyframe(hass, h264_video, worker_finished_stream): """Test that the has_keyframe metadata matches the media.""" await async_setup_component( hass, @@ -824,13 +844,17 @@ async def test_has_keyframe(hass, record_worker_sync, h264_video): }, ) - stream = create_stream(hass, h264_video, {}, stream_label="camera") + worker_finished, mock_stream = worker_finished_stream - # use record_worker_sync to grab output segments - with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") + with patch("homeassistant.components.stream.Stream", wraps=mock_stream): + stream = create_stream(hass, h264_video, {}, stream_label="camera") + + recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30) + await stream.start() + await worker_finished.wait() + + complete_segments = list(recorder_output.get_segments())[:-1] - complete_segments = list(await record_worker_sync.get_segments())[:-1] assert len(complete_segments) >= 1 # check that the Part has_keyframe metadata matches the keyframes in the media @@ -843,12 +867,10 @@ async def test_has_keyframe(hass, record_worker_sync, h264_video): av_part.close() assert part.has_keyframe == media_has_keyframe - await record_worker_sync.join() - await stream.stop() -async def test_h265_video_is_hvc1(hass, record_worker_sync): +async def test_h265_video_is_hvc1(hass, worker_finished_stream): """Test that a h265 video gets muxed as hvc1.""" await async_setup_component( hass, @@ -863,13 +885,16 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync): ) source = generate_h265_video() - stream = create_stream(hass, source, {}, stream_label="camera") - # use record_worker_sync to grab output segments - with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") + worker_finished, mock_stream = worker_finished_stream + with patch("homeassistant.components.stream.Stream", wraps=mock_stream): + stream = create_stream(hass, source, {}, stream_label="camera") - complete_segments = list(await record_worker_sync.get_segments())[:-1] + recorder_output = stream.add_provider(RECORDER_PROVIDER, timeout=30) + await stream.start() + await worker_finished.wait() + + complete_segments = list(recorder_output.get_segments())[:-1] assert len(complete_segments) >= 1 segment = complete_segments[0] @@ -878,8 +903,6 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync): assert av_part.streams.video[0].codec_tag == "hvc1" av_part.close() - await record_worker_sync.join() - await stream.stop() assert stream.get_diagnostics() == { @@ -891,7 +914,7 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync): } -async def test_get_image(hass, record_worker_sync): +async def test_get_image(hass): """Test that the has_keyframe metadata matches the media.""" await async_setup_component(hass, "stream", {"stream": {}}) @@ -904,14 +927,11 @@ async def test_get_image(hass, record_worker_sync): mock_turbo_jpeg_singleton.instance.return_value = mock_turbo_jpeg() stream = create_stream(hass, source, {}) - # use record_worker_sync to grab output segments with patch.object(hass.config, "is_allowed_path", return_value=True): - await stream.async_record("/example/path") - + make_recording = hass.async_create_task(stream.async_record("/example/path")) + await make_recording assert stream._keyframe_converter._image is None - await record_worker_sync.join() - assert await stream.async_get_image() == EMPTY_8_6_JPEG await stream.stop()