From 6edf06f8a4240bcda0ac74c175faa5f1429dca64 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Fri, 27 Dec 2024 18:47:33 -0800 Subject: [PATCH] Converge stream av open methods, options, and error handling (#134020) * Converge stream av open methods, options, and error handling * Remove exception that is never thrown * Update exceptions thrown in generic tests * Increase stream test coverage --- .../components/generic/config_flow.py | 6 - homeassistant/components/stream/__init__.py | 178 ++++++------------ homeassistant/components/stream/const.py | 18 +- homeassistant/components/stream/exceptions.py | 32 ++++ homeassistant/components/stream/worker.py | 59 ++++-- homeassistant/components/tplink/camera.py | 2 +- .../components/tplink/config_flow.py | 2 +- tests/components/generic/test_config_flow.py | 29 +-- tests/components/stream/test_hls.py | 16 +- tests/components/stream/test_init.py | 100 +++++++--- tests/components/stream/test_worker.py | 21 ++- tests/components/tplink/test_camera.py | 3 +- tests/components/tplink/test_config_flow.py | 7 +- 13 files changed, 267 insertions(+), 206 deletions(-) create mode 100644 homeassistant/components/stream/exceptions.py diff --git a/homeassistant/components/generic/config_flow.py b/homeassistant/components/generic/config_flow.py index 83894b489f0..1c06a7921cb 100644 --- a/homeassistant/components/generic/config_flow.py +++ b/homeassistant/components/generic/config_flow.py @@ -255,10 +255,6 @@ async def async_test_and_preview_stream( """ if not (stream_source := info.get(CONF_STREAM_SOURCE)): return None - # Import from stream.worker as stream cannot reexport from worker - # without forcing the av dependency on default_config - # pylint: disable-next=import-outside-toplevel - from homeassistant.components.stream.worker import StreamWorkerError if not isinstance(stream_source, template_helper.Template): stream_source = template_helper.Template(stream_source, hass) @@ -294,8 +290,6 @@ async def async_test_and_preview_stream( f"{DOMAIN}.test_stream", ) hls_provider = stream.add_provider(HLS_PROVIDER) - except StreamWorkerError as err: - raise InvalidStreamException("unknown_with_details", str(err)) from err except PermissionError as err: raise InvalidStreamException("stream_not_permitted") from err except OSError as err: diff --git a/homeassistant/components/stream/__init__.py b/homeassistant/components/stream/__init__.py index 1471db890d7..8692a2acaad 100644 --- a/homeassistant/components/stream/__init__.py +++ b/homeassistant/components/stream/__init__.py @@ -20,7 +20,6 @@ from __future__ import annotations import asyncio from collections.abc import Callable, Mapping import copy -from enum import IntEnum import logging import secrets import threading @@ -41,12 +40,12 @@ from homeassistant.util.async_ import create_eager_task from .const import ( ATTR_ENDPOINTS, + ATTR_PREFER_TCP, ATTR_SETTINGS, ATTR_STREAMS, CONF_EXTRA_PART_WAIT_TIME, CONF_LL_HLS, CONF_PART_DURATION, - CONF_PREFER_TCP, CONF_RTSP_TRANSPORT, CONF_SEGMENT_DURATION, CONF_USE_WALLCLOCK_AS_TIMESTAMPS, @@ -62,6 +61,7 @@ from .const import ( SOURCE_TIMEOUT, STREAM_RESTART_INCREMENT, STREAM_RESTART_RESET_TIME, + StreamClientError, ) from .core import ( PROVIDERS, @@ -73,11 +73,10 @@ from .core import ( StreamSettings, ) from .diagnostics import Diagnostics +from .exceptions import StreamOpenClientError, StreamWorkerError from .hls import HlsStreamOutput, async_setup_hls if TYPE_CHECKING: - from av.container import InputContainer, OutputContainer - from homeassistant.components.camera import DynamicStreamSettings __all__ = [ @@ -92,6 +91,8 @@ __all__ = [ "RTSP_TRANSPORTS", "SOURCE_TIMEOUT", "Stream", + "StreamClientError", + "StreamOpenClientError", "create_stream", "Orientation", ] @@ -99,91 +100,6 @@ __all__ = [ _LOGGER = logging.getLogger(__name__) -class StreamClientError(IntEnum): - """Enum for stream client errors.""" - - BadRequest = 400 - Unauthorized = 401 - Forbidden = 403 - NotFound = 404 - Other = 4 - - -class StreamOpenClientError(HomeAssistantError): - """Raised when client error received when trying to open a stream. - - :param stream_client_error: The type of client error - """ - - def __init__( - self, *args: Any, stream_client_error: StreamClientError, **kwargs: Any - ) -> None: - self.stream_client_error = stream_client_error - super().__init__(*args, **kwargs) - - -async def _async_try_open_stream( - hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None -) -> InputContainer | OutputContainer: - """Try to open a stream. - - Will raise StreamOpenClientError if an http client error is encountered. - """ - return await hass.loop.run_in_executor(None, _try_open_stream, source, pyav_options) - - -def _try_open_stream( - source: str, pyav_options: dict[str, str] | None = None -) -> InputContainer | OutputContainer: - """Try to open a stream. - - Will raise StreamOpenClientError if an http client error is encountered. - """ - import av # pylint: disable=import-outside-toplevel - - if pyav_options is None: - pyav_options = {} - - default_pyav_options = { - "rtsp_flags": CONF_PREFER_TCP, - "timeout": str(SOURCE_TIMEOUT), - } - - pyav_options = { - **default_pyav_options, - **pyav_options, - } - - try: - container = av.open(source, options=pyav_options, timeout=5) - - except av.HTTPBadRequestError as ex: - raise StreamOpenClientError( - stream_client_error=StreamClientError.BadRequest - ) from ex - - except av.HTTPUnauthorizedError as ex: - raise StreamOpenClientError( - stream_client_error=StreamClientError.Unauthorized - ) from ex - - except av.HTTPForbiddenError as ex: - raise StreamOpenClientError( - stream_client_error=StreamClientError.Forbidden - ) from ex - - except av.HTTPNotFoundError as ex: - raise StreamOpenClientError( - stream_client_error=StreamClientError.NotFound - ) from ex - - except av.HTTPOtherClientError as ex: - raise StreamOpenClientError(stream_client_error=StreamClientError.Other) from ex - - else: - return container - - async def async_check_stream_client_error( hass: HomeAssistant, source: str, pyav_options: dict[str, str] | None = None ) -> None: @@ -192,18 +108,24 @@ async def async_check_stream_client_error( Raise StreamOpenClientError if an http client error is encountered. """ await hass.loop.run_in_executor( - None, _check_stream_client_error, source, pyav_options + None, _check_stream_client_error, hass, source, pyav_options ) def _check_stream_client_error( - source: str, pyav_options: dict[str, str] | None = None + hass: HomeAssistant, source: str, options: dict[str, str] | None = None ) -> None: """Check if a stream can be successfully opened. Raise StreamOpenClientError if an http client error is encountered. """ - _try_open_stream(source, pyav_options).close() + from .worker import try_open_stream # pylint: disable=import-outside-toplevel + + pyav_options, _ = _convert_stream_options(hass, source, options or {}) + try: + try_open_stream(source, pyav_options).close() + except StreamWorkerError as err: + raise StreamOpenClientError(str(err), err.error_code) from err def redact_credentials(url: str) -> str: @@ -219,6 +141,42 @@ def redact_credentials(url: str) -> str: return str(yurl.update_query(redacted_query_params)) +def _convert_stream_options( + hass: HomeAssistant, + stream_source: str, + stream_options: Mapping[str, str | bool | float], +) -> tuple[dict[str, str], StreamSettings]: + """Convert options from stream options into PyAV options and stream settings.""" + if DOMAIN not in hass.data: + raise HomeAssistantError("Stream integration is not set up.") + + stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS]) + pyav_options: dict[str, str] = {} + try: + STREAM_OPTIONS_SCHEMA(stream_options) + except vol.Invalid as exc: + raise HomeAssistantError(f"Invalid stream options: {exc}") from exc + + if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME): + stream_settings.hls_part_timeout += extra_wait_time + if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): + assert isinstance(rtsp_transport, str) + # The PyAV options currently match the stream CONF constants, but this + # will not necessarily always be the case, so they are hard coded here + pyav_options["rtsp_transport"] = rtsp_transport + if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): + pyav_options["use_wallclock_as_timestamps"] = "1" + + # For RTSP streams, prefer TCP + if isinstance(stream_source, str) and stream_source[:7] == "rtsp://": + pyav_options = { + "rtsp_flags": ATTR_PREFER_TCP, + "stimeout": "5000000", + **pyav_options, + } + return pyav_options, stream_settings + + def create_stream( hass: HomeAssistant, stream_source: str, @@ -234,41 +192,13 @@ def create_stream( The stream_label is a string used as an additional message in logging. """ - def convert_stream_options( - hass: HomeAssistant, stream_options: Mapping[str, str | bool | float] - ) -> tuple[dict[str, str], StreamSettings]: - """Convert options from stream options into PyAV options and stream settings.""" - stream_settings = copy.copy(hass.data[DOMAIN][ATTR_SETTINGS]) - pyav_options: dict[str, str] = {} - try: - STREAM_OPTIONS_SCHEMA(stream_options) - except vol.Invalid as exc: - raise HomeAssistantError("Invalid stream options") from exc - - if extra_wait_time := stream_options.get(CONF_EXTRA_PART_WAIT_TIME): - stream_settings.hls_part_timeout += extra_wait_time - if rtsp_transport := stream_options.get(CONF_RTSP_TRANSPORT): - assert isinstance(rtsp_transport, str) - # The PyAV options currently match the stream CONF constants, but this - # will not necessarily always be the case, so they are hard coded here - pyav_options["rtsp_transport"] = rtsp_transport - if stream_options.get(CONF_USE_WALLCLOCK_AS_TIMESTAMPS): - pyav_options["use_wallclock_as_timestamps"] = "1" - - return pyav_options, stream_settings - if DOMAIN not in hass.config.components: raise HomeAssistantError("Stream integration is not set up.") # Convert extra stream options into PyAV options and stream settings - pyav_options, stream_settings = convert_stream_options(hass, options) - # For RTSP streams, prefer TCP - if isinstance(stream_source, str) and stream_source[:7] == "rtsp://": - pyav_options = { - "rtsp_flags": "prefer_tcp", - "stimeout": "5000000", - **pyav_options, - } + pyav_options, stream_settings = _convert_stream_options( + hass, stream_source, options + ) stream = Stream( hass, @@ -531,7 +461,7 @@ class Stream: """Handle consuming streams and restart keepalive streams.""" # Keep import here so that we can import stream integration without installing reqs # pylint: disable-next=import-outside-toplevel - from .worker import StreamState, StreamWorkerError, stream_worker + from .worker import StreamState, stream_worker stream_state = StreamState(self.hass, self.outputs, self._diagnostics) wait_timeout = 0 diff --git a/homeassistant/components/stream/const.py b/homeassistant/components/stream/const.py index 66455ffad1a..c81d2f6cb18 100644 --- a/homeassistant/components/stream/const.py +++ b/homeassistant/components/stream/const.py @@ -2,6 +2,7 @@ from __future__ import annotations +from enum import IntEnum from typing import Final DOMAIN = "stream" @@ -48,7 +49,7 @@ CONF_LL_HLS = "ll_hls" CONF_PART_DURATION = "part_duration" CONF_SEGMENT_DURATION = "segment_duration" -CONF_PREFER_TCP = "prefer_tcp" +ATTR_PREFER_TCP = "prefer_tcp" CONF_RTSP_TRANSPORT = "rtsp_transport" # The first dict entry below may be used as the default when populating options RTSP_TRANSPORTS = { @@ -59,3 +60,18 @@ RTSP_TRANSPORTS = { } CONF_USE_WALLCLOCK_AS_TIMESTAMPS = "use_wallclock_as_timestamps" CONF_EXTRA_PART_WAIT_TIME = "extra_part_wait_time" + + +class StreamClientError(IntEnum): + """Enum for stream client errors. + + These are errors that can be returned by the stream client when trying to + open a stream. The caller should not interpret the int values directly, but + should use the enum values instead. + """ + + BadRequest = 400 + Unauthorized = 401 + Forbidden = 403 + NotFound = 404 + Other = 4 diff --git a/homeassistant/components/stream/exceptions.py b/homeassistant/components/stream/exceptions.py new file mode 100644 index 00000000000..364ef6f3a02 --- /dev/null +++ b/homeassistant/components/stream/exceptions.py @@ -0,0 +1,32 @@ +"""Stream component exceptions.""" + +from homeassistant.exceptions import HomeAssistantError + +from .const import StreamClientError + + +class StreamOpenClientError(HomeAssistantError): + """Raised when client error received when trying to open a stream. + + :param stream_client_error: The type of client error + """ + + def __init__(self, message: str, error_code: StreamClientError) -> None: + """Initialize a stream open client error.""" + super().__init__(message) + self.error_code = error_code + + +class StreamWorkerError(Exception): + """An exception thrown while processing a stream.""" + + def __init__( + self, message: str, error_code: StreamClientError = StreamClientError.Other + ) -> None: + """Initialize a stream worker error.""" + super().__init__(message) + self.error_code = error_code + + +class StreamEndedError(StreamWorkerError): + """Raised when the stream is complete, exposed for facilitating testing.""" diff --git a/homeassistant/components/stream/worker.py b/homeassistant/components/stream/worker.py index 8c9bb1b8e9e..0c1f38938eb 100644 --- a/homeassistant/components/stream/worker.py +++ b/homeassistant/components/stream/worker.py @@ -15,6 +15,7 @@ from typing import Any, Self, cast import av import av.audio import av.container +from av.container import InputContainer import av.stream from homeassistant.core import HomeAssistant @@ -29,6 +30,7 @@ from .const import ( PACKETS_TO_WAIT_FOR_AUDIO, SEGMENT_CONTAINER_FORMAT, SOURCE_TIMEOUT, + StreamClientError, ) from .core import ( STREAM_SETTINGS_NON_LL_HLS, @@ -39,6 +41,7 @@ from .core import ( StreamSettings, ) from .diagnostics import Diagnostics +from .exceptions import StreamEndedError, StreamWorkerError from .fmp4utils import read_init from .hls import HlsStreamOutput @@ -46,10 +49,6 @@ _LOGGER = logging.getLogger(__name__) NEGATIVE_INF = float("-inf") -class StreamWorkerError(Exception): - """An exception thrown while processing a stream.""" - - def redact_av_error_string(err: av.FFmpegError) -> str: """Return an error string with credentials redacted from the url.""" parts = [str(err.type), err.strerror] # type: ignore[attr-defined] @@ -58,10 +57,6 @@ def redact_av_error_string(err: av.FFmpegError) -> str: return ", ".join(parts) -class StreamEndedError(StreamWorkerError): - """Raised when the stream is complete, exposed for facilitating testing.""" - - class StreamState: """Responsible for trakcing output and playback state for a stream. @@ -512,6 +507,47 @@ def get_audio_bitstream_filter( return None +def try_open_stream( + source: str, + pyav_options: dict[str, str], +) -> InputContainer: + """Try to open a stream. + + Will raise StreamOpenClientError if an http client error is encountered. + """ + + try: + return av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT) + except av.HTTPBadRequestError as err: + raise StreamWorkerError( + f"Bad Request Error opening stream ({redact_av_error_string(err)})", + error_code=StreamClientError.BadRequest, + ) from err + + except av.HTTPUnauthorizedError as err: + raise StreamWorkerError( + f"Unauthorized error opening stream ({redact_av_error_string(err)})", + error_code=StreamClientError.Unauthorized, + ) from err + + except av.HTTPForbiddenError as err: + raise StreamWorkerError( + f"Forbidden error opening stream ({redact_av_error_string(err)})", + error_code=StreamClientError.Forbidden, + ) from err + + except av.HTTPNotFoundError as err: + raise StreamWorkerError( + f"Not Found error opening stream ({redact_av_error_string(err)})", + error_code=StreamClientError.NotFound, + ) from err + + except av.FFmpegError as err: + raise StreamWorkerError( + f"Error opening stream ({redact_av_error_string(err)})" + ) from err + + def stream_worker( source: str, pyav_options: dict[str, str], @@ -526,12 +562,7 @@ def stream_worker( # the stimeout option was renamed to timeout as of ffmpeg 5.0 pyav_options["timeout"] = pyav_options["stimeout"] del pyav_options["stimeout"] - try: - container = av.open(source, options=pyav_options, timeout=SOURCE_TIMEOUT) - except av.FFmpegError as err: - raise StreamWorkerError( - f"Error opening stream ({redact_av_error_string(err)})" - ) from err + container = try_open_stream(source, pyav_options) try: video_stream = container.streams.video[0] except (KeyError, IndexError) as ex: diff --git a/homeassistant/components/tplink/camera.py b/homeassistant/components/tplink/camera.py index 4a6859a8414..01b47db7082 100644 --- a/homeassistant/components/tplink/camera.py +++ b/homeassistant/components/tplink/camera.py @@ -130,7 +130,7 @@ class TPLinkCameraEntity(CoordinatedTPLinkEntity, Camera): try: await stream.async_check_stream_client_error(self.hass, video_url) except stream.StreamOpenClientError as ex: - if ex.stream_client_error is stream.StreamClientError.Unauthorized: + if ex.error_code is stream.StreamClientError.Unauthorized: _LOGGER.debug( "Camera stream failed authentication for %s", self._device.host, diff --git a/homeassistant/components/tplink/config_flow.py b/homeassistant/components/tplink/config_flow.py index db6f9a58ba5..9bc278f8948 100644 --- a/homeassistant/components/tplink/config_flow.py +++ b/homeassistant/components/tplink/config_flow.py @@ -468,7 +468,7 @@ class TPLinkConfigFlow(ConfigFlow, domain=DOMAIN): try: await stream.async_check_stream_client_error(self.hass, rtsp_url) except stream.StreamOpenClientError as ex: - if ex.stream_client_error is stream.StreamClientError.Unauthorized: + if ex.error_code is stream.StreamClientError.Unauthorized: errors["base"] = "invalid_camera_auth" else: _LOGGER.debug( diff --git a/tests/components/generic/test_config_flow.py b/tests/components/generic/test_config_flow.py index f121b210c0c..7cc0b3d4a6c 100644 --- a/tests/components/generic/test_config_flow.py +++ b/tests/components/generic/test_config_flow.py @@ -30,7 +30,6 @@ from homeassistant.components.stream import ( CONF_RTSP_TRANSPORT, CONF_USE_WALLCLOCK_AS_TIMESTAMPS, ) -from homeassistant.components.stream.worker import StreamWorkerError from homeassistant.config_entries import ConfigEntryState, ConfigFlowResult from homeassistant.const import ( CONF_AUTHENTICATION, @@ -646,25 +645,6 @@ async def test_form_stream_other_error(hass: HomeAssistant, user_flow) -> None: await hass.async_block_till_done() -@respx.mock -@pytest.mark.usefixtures("fakeimg_png") -async def test_form_stream_worker_error( - hass: HomeAssistant, user_flow: ConfigFlowResult -) -> None: - """Test we handle a StreamWorkerError and pass the message through.""" - with patch( - "homeassistant.components.generic.config_flow.create_stream", - side_effect=StreamWorkerError("Some message"), - ): - result2 = await hass.config_entries.flow.async_configure( - user_flow["flow_id"], - TESTDATA, - ) - assert result2["type"] is FlowResultType.FORM - assert result2["errors"] == {"stream_source": "unknown_with_details"} - assert result2["description_placeholders"] == {"error": "Some message"} - - @respx.mock async def test_form_stream_permission_error( hass: HomeAssistant, fakeimgbytes_png: bytes, user_flow: ConfigFlowResult @@ -905,23 +885,22 @@ async def test_options_only_stream( @respx.mock @pytest.mark.usefixtures("fakeimg_png") -async def test_form_options_stream_worker_error( +async def test_form_options_permission_error( hass: HomeAssistant, config_entry: MockConfigEntry ) -> None: - """Test we handle a StreamWorkerError and pass the message through.""" + """Test we handle a PermissionError and pass the message through.""" result = await hass.config_entries.options.async_init(config_entry.entry_id) with patch( "homeassistant.components.generic.config_flow.create_stream", - side_effect=StreamWorkerError("Some message"), + side_effect=PermissionError("Some message"), ): result2 = await hass.config_entries.options.async_configure( result["flow_id"], TESTDATA, ) assert result2["type"] is FlowResultType.FORM - assert result2["errors"] == {"stream_source": "unknown_with_details"} - assert result2["description_placeholders"] == {"error": "Some message"} + assert result2["errors"] == {"stream_source": "stream_not_permitted"} @pytest.mark.usefixtures("fakeimg_png") diff --git a/tests/components/stream/test_hls.py b/tests/components/stream/test_hls.py index babd7c0b748..9ce297c3fb6 100644 --- a/tests/components/stream/test_hls.py +++ b/tests/components/stream/test_hls.py @@ -278,8 +278,19 @@ async def test_stream_timeout_after_stop( await hass.async_block_till_done() +@pytest.mark.parametrize( + ("exception"), + [ + # pylint: disable-next=c-extension-no-member + (av.error.InvalidDataError(-2, "error")), + (av.HTTPBadRequestError(500, "error")), + ], +) async def test_stream_retries( - hass: HomeAssistant, setup_component, should_retry + hass: HomeAssistant, + setup_component, + should_retry, + exception, ) -> None: """Test hls stream is retried on failure.""" # Setup demo HLS track @@ -309,8 +320,7 @@ async def test_stream_retries( def av_open_side_effect(*args, **kwargs): hass.loop.call_soon_threadsafe(futures.pop().set_result, None) - # pylint: disable-next=c-extension-no-member - raise av.error.InvalidDataError(-2, "error") + raise exception with ( patch("av.open") as av_open, diff --git a/tests/components/stream/test_init.py b/tests/components/stream/test_init.py index 5f9d305620d..ba5f8d2e70f 100644 --- a/tests/components/stream/test_init.py +++ b/tests/components/stream/test_init.py @@ -1,24 +1,41 @@ """Test stream init.""" import logging +from typing import Any from unittest.mock import MagicMock, patch import av import pytest from homeassistant.components.stream import ( - CONF_PREFER_TCP, SOURCE_TIMEOUT, StreamClientError, StreamOpenClientError, __name__ as stream_name, - _async_try_open_stream, async_check_stream_client_error, + create_stream, ) +from homeassistant.components.stream.const import ATTR_PREFER_TCP from homeassistant.const import EVENT_LOGGING_CHANGED from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError from homeassistant.setup import async_setup_component +from .common import dynamic_stream_settings + + +async def test_stream_not_setup(hass: HomeAssistant, h264_video) -> None: + """Test hls stream. + + Purposefully not mocking anything here to test full + integration with the stream component. + """ + with pytest.raises(HomeAssistantError, match="Stream integration is not set up"): + create_stream(hass, "rtsp://foobar", {}, dynamic_stream_settings()) + + with pytest.raises(HomeAssistantError, match="Stream integration is not set up"): + await async_check_stream_client_error(hass, "rtsp://foobar") + async def test_log_levels( hass: HomeAssistant, caplog: pytest.LogCaptureFixture @@ -66,6 +83,7 @@ async def test_log_levels( async def test_check_open_stream_params(hass: HomeAssistant) -> None: """Test check open stream params.""" + await async_setup_component(hass, "stream", {"stream": {}}) container_mock = MagicMock() source = "rtsp://foobar" @@ -74,24 +92,19 @@ async def test_check_open_stream_params(hass: HomeAssistant) -> None: await async_check_stream_client_error(hass, source) options = { - "rtsp_flags": CONF_PREFER_TCP, - "timeout": str(SOURCE_TIMEOUT), + "rtsp_flags": ATTR_PREFER_TCP, + "stimeout": "5000000", } - open_mock.assert_called_once_with(source, options=options, timeout=5) + open_mock.assert_called_once_with(source, options=options, timeout=SOURCE_TIMEOUT) container_mock.close.assert_called_once() container_mock.reset_mock() - with patch("av.open", return_value=container_mock) as open_mock: + with ( + patch("av.open", return_value=container_mock) as open_mock, + pytest.raises(HomeAssistantError, match="Invalid stream options"), + ): await async_check_stream_client_error(hass, source, {"foo": "bar"}) - options = { - "rtsp_flags": CONF_PREFER_TCP, - "timeout": str(SOURCE_TIMEOUT), - "foo": "bar", - } - open_mock.assert_called_once_with(source, options=options, timeout=5) - container_mock.close.assert_called_once() - @pytest.mark.parametrize( ("error", "enum_result"), @@ -121,13 +134,56 @@ async def test_try_open_stream_error( hass: HomeAssistant, error: av.HTTPClientError, enum_result: StreamClientError ) -> None: """Test trying to open a stream.""" - oc_error: StreamOpenClientError | None = None + await async_setup_component(hass, "stream", {"stream": {}}) - with patch("av.open", side_effect=error): - try: - await _async_try_open_stream(hass, "rtsp://foobar") - except StreamOpenClientError as ex: - oc_error = ex + with ( + patch("av.open", side_effect=error), + pytest.raises(StreamOpenClientError) as ex, + ): + await async_check_stream_client_error(hass, "rtsp://foobar") + assert ex.value.error_code is enum_result - assert oc_error - assert oc_error.stream_client_error is enum_result + +@pytest.mark.parametrize( + ("options", "expected_pyav_options"), + [ + ( + {}, + {"rtsp_flags": "prefer_tcp", "stimeout": "5000000"}, + ), + ( + {"rtsp_transport": "udp"}, + { + "rtsp_flags": "prefer_tcp", + "rtsp_transport": "udp", + "stimeout": "5000000", + }, + ), + ( + {"use_wallclock_as_timestamps": True}, + { + "rtsp_flags": "prefer_tcp", + "stimeout": "5000000", + "use_wallclock_as_timestamps": "1", + }, + ), + ], +) +async def test_convert_stream_options( + hass: HomeAssistant, + options: dict[str, Any], + expected_pyav_options: dict[str, Any], +) -> None: + """Test stream options.""" + await async_setup_component(hass, "stream", {"stream": {}}) + + container_mock = MagicMock() + source = "rtsp://foobar" + + with patch("av.open", return_value=container_mock) as open_mock: + await async_check_stream_client_error(hass, source, options) + + open_mock.assert_called_once_with( + source, options=expected_pyav_options, timeout=SOURCE_TIMEOUT + ) + container_mock.close.assert_called_once() diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index 73c51087ef1..2be972cc6a2 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -41,6 +41,7 @@ from homeassistant.components.stream.const import ( TARGET_SEGMENT_DURATION_NON_LL_HLS, ) from homeassistant.components.stream.core import Orientation, StreamSettings +from homeassistant.components.stream.exceptions import StreamClientError from homeassistant.components.stream.worker import ( StreamEndedError, StreamState, @@ -341,7 +342,18 @@ async def async_decode_stream( return py_av.capture_buffer -async def test_stream_open_fails(hass: HomeAssistant) -> None: +@pytest.mark.parametrize( + ("exception", "error_code"), + [ + # pylint: disable-next=c-extension-no-member + (av.error.InvalidDataError(-2, "error"), StreamClientError.Other), + (av.HTTPBadRequestError(400, ""), StreamClientError.BadRequest), + (av.HTTPUnauthorizedError(401, ""), StreamClientError.Unauthorized), + ], +) +async def test_stream_open_fails( + hass: HomeAssistant, exception: Exception, error_code: StreamClientError +) -> None: """Test failure on stream open.""" stream = Stream( hass, @@ -352,12 +364,11 @@ async def test_stream_open_fails(hass: HomeAssistant) -> None: ) stream.add_provider(HLS_PROVIDER) with patch("av.open") as av_open: - # pylint: disable-next=c-extension-no-member - av_open.side_effect = av.error.InvalidDataError(-2, "error") - with pytest.raises(StreamWorkerError): + av_open.side_effect = exception + with pytest.raises(StreamWorkerError) as err: run_worker(hass, stream, STREAM_SOURCE) - await hass.async_block_till_done() av_open.assert_called_once() + assert err.value.error_code == error_code async def test_stream_worker_success(hass: HomeAssistant) -> None: diff --git a/tests/components/tplink/test_camera.py b/tests/components/tplink/test_camera.py index 8ca56a84b6b..aa83ae659fb 100644 --- a/tests/components/tplink/test_camera.py +++ b/tests/components/tplink/test_camera.py @@ -346,7 +346,8 @@ async def test_camera_image_auth_error( patch( "homeassistant.components.stream.async_check_stream_client_error", side_effect=stream.StreamOpenClientError( - stream_client_error=stream.StreamClientError.Unauthorized + "Request was unauthorized", + error_code=stream.StreamClientError.Unauthorized, ), ), pytest.raises(HomeAssistantError), diff --git a/tests/components/tplink/test_config_flow.py b/tests/components/tplink/test_config_flow.py index 980fd0a3f51..14f1260e2ec 100644 --- a/tests/components/tplink/test_config_flow.py +++ b/tests/components/tplink/test_config_flow.py @@ -768,7 +768,7 @@ async def test_manual_camera( patch( "homeassistant.components.stream.async_check_stream_client_error", side_effect=stream.StreamOpenClientError( - stream_client_error=stream.StreamClientError.NotFound + "Stream was not found", error_code=stream.StreamClientError.NotFound ), ), ): @@ -791,7 +791,8 @@ async def test_manual_camera( patch( "homeassistant.components.stream.async_check_stream_client_error", side_effect=stream.StreamOpenClientError( - stream_client_error=stream.StreamClientError.Unauthorized + "Request is unauthorized", + error_code=stream.StreamClientError.Unauthorized, ), ), ): @@ -835,7 +836,7 @@ async def test_manual_camera( [ pytest.param( stream.StreamOpenClientError( - stream_client_error=stream.StreamClientError.NotFound + "Stream was not found", error_code=stream.StreamClientError.NotFound ), id="open_client_error", ),