From 0dee4f85f0ffd863f66a966abee9486408193f65 Mon Sep 17 00:00:00 2001 From: Allen Porter Date: Fri, 31 Dec 2021 13:44:33 -0800 Subject: [PATCH] Implement an rtsp to webrtc registry in camera (#62962) * Implement a webrtc to rtsp support in camera as a registry Allow integrations to register a provider that can convert an RTSP stream and WebRTC offer to a WebRTC answer. This is planned to be used by the RTSPtoWebRTC server integration as an initial pass, but could support other server implementations as well (or even native implementationf or that matter). * Fix test bug to improve test covergae and restructure statements * Add missing call to refresh webrtc providers * Run provider refresh in parallel since it may send RPCs * Replace for loop with any * Fix pylint warning to use a generator --- homeassistant/components/camera/__init__.py | 98 ++++++++- homeassistant/components/camera/const.py | 1 + tests/components/camera/test_init.py | 217 ++++++++++++++++++-- 3 files changed, 299 insertions(+), 17 deletions(-) diff --git a/homeassistant/components/camera/__init__.py b/homeassistant/components/camera/__init__.py index 5c81ece6141..cef5326897e 100644 --- a/homeassistant/components/camera/__init__.py +++ b/homeassistant/components/camera/__init__.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio import base64 import collections -from collections.abc import Awaitable, Callable, Mapping +from collections.abc import Awaitable, Callable, Iterable, Mapping from contextlib import suppress from dataclasses import dataclass from datetime import datetime, timedelta @@ -61,6 +61,7 @@ from .const import ( CONF_DURATION, CONF_LOOKBACK, DATA_CAMERA_PREFS, + DATA_RTSP_TO_WEB_RTC, DOMAIN, SERVICE_RECORD, STREAM_TYPE_HLS, @@ -93,6 +94,8 @@ STATE_IDLE: Final = "idle" SUPPORT_ON_OFF: Final = 1 SUPPORT_STREAM: Final = 2 +RTSP_PREFIXES = {"rtsp://", "rtsps://"} + DEFAULT_CONTENT_TYPE: Final = "image/jpeg" ENTITY_IMAGE_URL: Final = "/api/camera_proxy/{0}?token={1}" @@ -284,6 +287,55 @@ def _get_camera_from_entity_id(hass: HomeAssistant, entity_id: str) -> Camera: return cast(Camera, camera) +def async_register_rtsp_to_web_rtc_provider( + hass: HomeAssistant, + domain: str, + provider: Callable[[str, str], Awaitable[str | None]], +) -> Callable[[], None]: + """Register an RTSP to WebRTC provider. + + Integrations may register a Callable that accepts a `stream_source` and + SDP `offer` as an input, and the output is the SDP `answer`. An implementation + may return None if the source or offer is not eligible or throw HomeAssistantError + on failure. The first provider to satisfy the offer will be used. + """ + if DOMAIN not in hass.data: + raise ValueError("Unexpected state, camera not loaded") + + def remove_provider() -> None: + if domain in hass.data[DATA_RTSP_TO_WEB_RTC]: + del hass.data[DATA_RTSP_TO_WEB_RTC] + hass.async_create_task(_async_refresh_providers(hass)) + + hass.data.setdefault(DATA_RTSP_TO_WEB_RTC, {}) + hass.data[DATA_RTSP_TO_WEB_RTC][domain] = provider + hass.async_create_task(_async_refresh_providers(hass)) + return remove_provider + + +async def _async_refresh_providers(hass: HomeAssistant) -> None: + """Check all cameras for any state changes for registered providers.""" + + async def _refresh(camera: Camera) -> None: + if await camera.async_refresh_providers(): + camera.async_write_ha_state() + + component: EntityComponent = hass.data[DOMAIN] + await asyncio.gather( + *(_refresh(cast(Camera, camera)) for camera in component.entities) + ) + + +def _async_get_rtsp_to_web_rtc_providers( + hass: HomeAssistant, +) -> Iterable[Callable[[str, str], Awaitable[str | None]]]: + """Return registered RTSP to WebRTC providers.""" + providers: dict[str, Callable[[str, str], Awaitable[str | None]]] = hass.data.get( + DATA_RTSP_TO_WEB_RTC, {} + ) + return providers.values() + + async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the camera component.""" component = hass.data[DOMAIN] = EntityComponent( @@ -391,6 +443,7 @@ class Camera(Entity): self._warned_old_signature = False self.async_update_token() self._create_stream_lock: asyncio.Lock | None = None + self._rtsp_to_webrtc = False @property def entity_picture(self) -> str: @@ -446,6 +499,8 @@ class Camera(Entity): return self._attr_frontend_stream_type if not self.supported_features & SUPPORT_STREAM: return None + if self._rtsp_to_webrtc: + return STREAM_TYPE_WEB_RTC return STREAM_TYPE_HLS @property @@ -487,8 +542,17 @@ class Camera(Entity): """Handle the WebRTC offer and return an answer. This is used by cameras with SUPPORT_STREAM and STREAM_TYPE_WEB_RTC. + + Integrations can override with a native WebRTC implementation. """ - raise NotImplementedError() + stream_source = await self.stream_source() + if not stream_source: + return None + for provider in _async_get_rtsp_to_web_rtc_providers(self.hass): + answer_sdp = await provider(stream_source, offer_sdp) + if answer_sdp: + return answer_sdp + raise HomeAssistantError("WebRTC offer was not accepted by any providers") def camera_image( self, width: int | None = None, height: int | None = None @@ -614,6 +678,36 @@ class Camera(Entity): hashlib.sha256(_RND.getrandbits(256).to_bytes(32, "little")).hexdigest() ) + async def async_internal_added_to_hass(self) -> None: + """Run when entity about to be added to hass.""" + await super().async_internal_added_to_hass() + # Note: State is always updated by entity on return + await self.async_refresh_providers() + + async def async_refresh_providers(self) -> bool: + """Determine if any of the registered providers are suitable for this entity. + + This affects state attributes, so it should be invoked any time the registered + providers or inputs to the state attributes change. + + Returns True if any state was updated (and needs to be written) + """ + old_state = self._rtsp_to_webrtc + self._rtsp_to_webrtc = await self._async_use_rtsp_to_webrtc() + return old_state != self._rtsp_to_webrtc + + async def _async_use_rtsp_to_webrtc(self) -> bool: + """Determine if a WebRTC provider can be used for the camera.""" + if not self.supported_features & SUPPORT_STREAM: + return False + if DATA_RTSP_TO_WEB_RTC not in self.hass.data: + return False + stream_source = await self.stream_source() + return any( + stream_source and stream_source.startswith(prefix) + for prefix in RTSP_PREFIXES + ) + class CameraView(HomeAssistantView): """Base CameraView.""" diff --git a/homeassistant/components/camera/const.py b/homeassistant/components/camera/const.py index 3eb131200e6..cef8773d974 100644 --- a/homeassistant/components/camera/const.py +++ b/homeassistant/components/camera/const.py @@ -4,6 +4,7 @@ from typing import Final DOMAIN: Final = "camera" DATA_CAMERA_PREFS: Final = "camera_prefs" +DATA_RTSP_TO_WEB_RTC: Final = "rtsp_to_web_rtc" PREF_PRELOAD_STREAM: Final = "preload_stream" diff --git a/tests/components/camera/test_init.py b/tests/components/camera/test_init.py index 13ec69bcd4e..da7cd946f22 100644 --- a/tests/components/camera/test_init.py +++ b/tests/components/camera/test_init.py @@ -28,6 +28,11 @@ from .common import EMPTY_8_6_JPEG, mock_turbo_jpeg from tests.components.camera import common +STREAM_SOURCE = "rtsp://127.0.0.1/stream" +HLS_STREAM_SOURCE = "http://127.0.0.1/example.m3u" +WEBRTC_OFFER = "v=0\r\n" +WEBRTC_ANSWER = "a=sendonly" + @pytest.fixture(name="mock_camera") async def mock_camera_fixture(hass): @@ -57,7 +62,7 @@ async def mock_camera_web_rtc_fixture(hass): new_callable=PropertyMock(return_value=STREAM_TYPE_WEB_RTC), ), patch( "homeassistant.components.camera.Camera.async_handle_web_rtc_offer", - return_value="a=sendonly", + return_value=WEBRTC_ANSWER, ): yield @@ -85,6 +90,50 @@ async def image_mock_url_fixture(hass): await hass.async_block_till_done() +@pytest.fixture(name="mock_stream_source") +async def mock_stream_source_fixture(): + """Fixture to create an RTSP stream source.""" + with patch( + "homeassistant.components.camera.Camera.stream_source", + return_value=STREAM_SOURCE, + ) as mock_stream_source, patch( + "homeassistant.components.camera.Camera.supported_features", + return_value=camera.SUPPORT_STREAM, + ): + yield mock_stream_source + + +@pytest.fixture(name="mock_hls_stream_source") +async def mock_hls_stream_source_fixture(): + """Fixture to create an HLS stream source.""" + with patch( + "homeassistant.components.camera.Camera.stream_source", + return_value=HLS_STREAM_SOURCE, + ) as mock_hls_stream_source, patch( + "homeassistant.components.camera.Camera.supported_features", + return_value=camera.SUPPORT_STREAM, + ): + yield mock_hls_stream_source + + +async def provide_web_rtc_answer(stream_source: str, offer: str) -> str: + """Simulate an rtsp to webrtc provider.""" + assert stream_source == STREAM_SOURCE + assert offer == WEBRTC_OFFER + return WEBRTC_ANSWER + + +@pytest.fixture(name="mock_rtsp_to_web_rtc") +async def mock_rtsp_to_web_rtc_fixture(hass): + """Fixture that registers a mock rtsp to web_rtc provider.""" + mock_provider = Mock(side_effect=provide_web_rtc_answer) + unsub = camera.async_register_rtsp_to_web_rtc_provider( + hass, "mock_domain", mock_provider + ) + yield mock_provider + unsub() + + async def test_get_image_from_camera(hass, image_mock_url): """Grab an image from camera entity.""" @@ -189,17 +238,13 @@ async def test_get_image_from_camera_not_jpeg(hass, image_mock_url): assert image.content == b"png" -async def test_get_stream_source_from_camera(hass, mock_camera): +async def test_get_stream_source_from_camera(hass, mock_camera, mock_stream_source): """Fetch stream source from camera entity.""" - with patch( - "homeassistant.components.camera.Camera.stream_source", - return_value="rtsp://127.0.0.1/stream", - ) as mock_camera_stream_source: - stream_source = await camera.async_get_stream_source(hass, "camera.demo_camera") + stream_source = await camera.async_get_stream_source(hass, "camera.demo_camera") - assert mock_camera_stream_source.called - assert stream_source == "rtsp://127.0.0.1/stream" + assert mock_stream_source.called + assert stream_source == STREAM_SOURCE async def test_get_image_without_exists_camera(hass, image_mock_url): @@ -503,7 +548,7 @@ async def test_websocket_web_rtc_offer( "id": 9, "type": "camera/web_rtc_offer", "entity_id": "camera.demo_camera", - "offer": "v=0\r\n", + "offer": WEBRTC_OFFER, } ) response = await client.receive_json() @@ -511,7 +556,7 @@ async def test_websocket_web_rtc_offer( assert response["id"] == 9 assert response["type"] == TYPE_RESULT assert response["success"] - assert response["result"]["answer"] == "a=sendonly" + assert response["result"]["answer"] == WEBRTC_ANSWER async def test_websocket_web_rtc_offer_invalid_entity( @@ -526,7 +571,7 @@ async def test_websocket_web_rtc_offer_invalid_entity( "id": 9, "type": "camera/web_rtc_offer", "entity_id": "camera.does_not_exist", - "offer": "v=0\r\n", + "offer": WEBRTC_OFFER, } ) response = await client.receive_json() @@ -575,7 +620,7 @@ async def test_websocket_web_rtc_offer_failure( "id": 9, "type": "camera/web_rtc_offer", "entity_id": "camera.demo_camera", - "offer": "v=0\r\n", + "offer": WEBRTC_OFFER, } ) response = await client.receive_json() @@ -604,7 +649,7 @@ async def test_websocket_web_rtc_offer_timeout( "id": 9, "type": "camera/web_rtc_offer", "entity_id": "camera.demo_camera", - "offer": "v=0\r\n", + "offer": WEBRTC_OFFER, } ) response = await client.receive_json() @@ -628,7 +673,7 @@ async def test_websocket_web_rtc_offer_invalid_stream_type( "id": 9, "type": "camera/web_rtc_offer", "entity_id": "camera.demo_camera", - "offer": "v=0\r\n", + "offer": WEBRTC_OFFER, } ) response = await client.receive_json() @@ -690,3 +735,145 @@ async def test_stream_unavailable(hass, hass_ws_client, mock_camera, mock_stream demo_camera = hass.states.get("camera.demo_camera") assert demo_camera is not None assert demo_camera.state == camera.STATE_STREAMING + + +async def test_rtsp_to_web_rtc_offer( + hass, + hass_ws_client, + mock_camera, + mock_stream_source, + mock_rtsp_to_web_rtc, +): + """Test creating a web_rtc offer from an rstp provider.""" + client = await hass_ws_client(hass) + await client.send_json( + { + "id": 9, + "type": "camera/web_rtc_offer", + "entity_id": "camera.demo_camera", + "offer": WEBRTC_OFFER, + } + ) + response = await client.receive_json() + + assert response.get("id") == 9 + assert response.get("type") == TYPE_RESULT + assert response.get("success") + assert "result" in response + assert response["result"] == {"answer": WEBRTC_ANSWER} + + assert mock_rtsp_to_web_rtc.called + + +async def test_unsupported_rtsp_to_web_rtc_stream_type( + hass, + hass_ws_client, + mock_camera, + mock_hls_stream_source, # Not an RTSP stream source + mock_rtsp_to_web_rtc, +): + """Test rtsp-to-webrtc is not registered for non-RTSP streams.""" + client = await hass_ws_client(hass) + await client.send_json( + { + "id": 10, + "type": "camera/web_rtc_offer", + "entity_id": "camera.demo_camera", + "offer": WEBRTC_OFFER, + } + ) + response = await client.receive_json() + + assert response.get("id") == 10 + assert response.get("type") == TYPE_RESULT + assert "success" in response + assert not response["success"] + + +async def test_rtsp_to_web_rtc_provider_unregistered( + hass, + hass_ws_client, + mock_camera, + mock_stream_source, +): + """Test creating a web_rtc offer from an rstp provider.""" + mock_provider = Mock(side_effect=provide_web_rtc_answer) + unsub = camera.async_register_rtsp_to_web_rtc_provider( + hass, "mock_domain", mock_provider + ) + + client = await hass_ws_client(hass) + + # Registered provider can handle the WebRTC offer + await client.send_json( + { + "id": 11, + "type": "camera/web_rtc_offer", + "entity_id": "camera.demo_camera", + "offer": WEBRTC_OFFER, + } + ) + response = await client.receive_json() + assert response["id"] == 11 + assert response["type"] == TYPE_RESULT + assert response["success"] + assert response["result"]["answer"] == WEBRTC_ANSWER + + assert mock_provider.called + mock_provider.reset_mock() + + # Unregister provider, then verify the WebRTC offer cannot be handled + unsub() + await client.send_json( + { + "id": 12, + "type": "camera/web_rtc_offer", + "entity_id": "camera.demo_camera", + "offer": WEBRTC_OFFER, + } + ) + response = await client.receive_json() + assert response.get("id") == 12 + assert response.get("type") == TYPE_RESULT + assert "success" in response + assert not response["success"] + + assert not mock_provider.called + + +async def test_rtsp_to_web_rtc_offer_not_accepted( + hass, + hass_ws_client, + mock_camera, + mock_stream_source, +): + """Test a provider that can't satisfy the rtsp to webrtc offer.""" + + async def provide_none(stream_source: str, offer: str) -> str: + """Simulate a provider that can't accept the offer.""" + return None + + mock_provider = Mock(side_effect=provide_none) + unsub = camera.async_register_rtsp_to_web_rtc_provider( + hass, "mock_domain", mock_provider + ) + client = await hass_ws_client(hass) + + # Registered provider can handle the WebRTC offer + await client.send_json( + { + "id": 11, + "type": "camera/web_rtc_offer", + "entity_id": "camera.demo_camera", + "offer": WEBRTC_OFFER, + } + ) + response = await client.receive_json() + assert response["id"] == 11 + assert response.get("type") == TYPE_RESULT + assert "success" in response + assert not response["success"] + + assert mock_provider.called + + unsub()