Implement an rtsp to webrtc registry in camera (#62962)

* Implement a webrtc to rtsp support in camera as a registry

Allow integrations to register a provider that can convert an RTSP stream and WebRTC offer to a WebRTC answer. This is
planned to be used by the RTSPtoWebRTC server integration as an initial pass, but could
support other server implementations as well (or even native implementationf or that matter).

* Fix test bug to improve test covergae and restructure statements

* Add missing call to refresh webrtc providers

* Run provider refresh in parallel since it may send RPCs

* Replace for loop with any

* Fix pylint warning to use a generator
This commit is contained in:
Allen Porter 2021-12-31 13:44:33 -08:00 committed by GitHub
parent 0de3a299d6
commit 0dee4f85f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 299 additions and 17 deletions

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import asyncio
import base64
import collections
from collections.abc import Awaitable, Callable, Mapping
from collections.abc import Awaitable, Callable, Iterable, Mapping
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
@ -61,6 +61,7 @@ from .const import (
CONF_DURATION,
CONF_LOOKBACK,
DATA_CAMERA_PREFS,
DATA_RTSP_TO_WEB_RTC,
DOMAIN,
SERVICE_RECORD,
STREAM_TYPE_HLS,
@ -93,6 +94,8 @@ STATE_IDLE: Final = "idle"
SUPPORT_ON_OFF: Final = 1
SUPPORT_STREAM: Final = 2
RTSP_PREFIXES = {"rtsp://", "rtsps://"}
DEFAULT_CONTENT_TYPE: Final = "image/jpeg"
ENTITY_IMAGE_URL: Final = "/api/camera_proxy/{0}?token={1}"
@ -284,6 +287,55 @@ def _get_camera_from_entity_id(hass: HomeAssistant, entity_id: str) -> Camera:
return cast(Camera, camera)
def async_register_rtsp_to_web_rtc_provider(
hass: HomeAssistant,
domain: str,
provider: Callable[[str, str], Awaitable[str | None]],
) -> Callable[[], None]:
"""Register an RTSP to WebRTC provider.
Integrations may register a Callable that accepts a `stream_source` and
SDP `offer` as an input, and the output is the SDP `answer`. An implementation
may return None if the source or offer is not eligible or throw HomeAssistantError
on failure. The first provider to satisfy the offer will be used.
"""
if DOMAIN not in hass.data:
raise ValueError("Unexpected state, camera not loaded")
def remove_provider() -> None:
if domain in hass.data[DATA_RTSP_TO_WEB_RTC]:
del hass.data[DATA_RTSP_TO_WEB_RTC]
hass.async_create_task(_async_refresh_providers(hass))
hass.data.setdefault(DATA_RTSP_TO_WEB_RTC, {})
hass.data[DATA_RTSP_TO_WEB_RTC][domain] = provider
hass.async_create_task(_async_refresh_providers(hass))
return remove_provider
async def _async_refresh_providers(hass: HomeAssistant) -> None:
"""Check all cameras for any state changes for registered providers."""
async def _refresh(camera: Camera) -> None:
if await camera.async_refresh_providers():
camera.async_write_ha_state()
component: EntityComponent = hass.data[DOMAIN]
await asyncio.gather(
*(_refresh(cast(Camera, camera)) for camera in component.entities)
)
def _async_get_rtsp_to_web_rtc_providers(
hass: HomeAssistant,
) -> Iterable[Callable[[str, str], Awaitable[str | None]]]:
"""Return registered RTSP to WebRTC providers."""
providers: dict[str, Callable[[str, str], Awaitable[str | None]]] = hass.data.get(
DATA_RTSP_TO_WEB_RTC, {}
)
return providers.values()
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the camera component."""
component = hass.data[DOMAIN] = EntityComponent(
@ -391,6 +443,7 @@ class Camera(Entity):
self._warned_old_signature = False
self.async_update_token()
self._create_stream_lock: asyncio.Lock | None = None
self._rtsp_to_webrtc = False
@property
def entity_picture(self) -> str:
@ -446,6 +499,8 @@ class Camera(Entity):
return self._attr_frontend_stream_type
if not self.supported_features & SUPPORT_STREAM:
return None
if self._rtsp_to_webrtc:
return STREAM_TYPE_WEB_RTC
return STREAM_TYPE_HLS
@property
@ -487,8 +542,17 @@ class Camera(Entity):
"""Handle the WebRTC offer and return an answer.
This is used by cameras with SUPPORT_STREAM and STREAM_TYPE_WEB_RTC.
Integrations can override with a native WebRTC implementation.
"""
raise NotImplementedError()
stream_source = await self.stream_source()
if not stream_source:
return None
for provider in _async_get_rtsp_to_web_rtc_providers(self.hass):
answer_sdp = await provider(stream_source, offer_sdp)
if answer_sdp:
return answer_sdp
raise HomeAssistantError("WebRTC offer was not accepted by any providers")
def camera_image(
self, width: int | None = None, height: int | None = None
@ -614,6 +678,36 @@ class Camera(Entity):
hashlib.sha256(_RND.getrandbits(256).to_bytes(32, "little")).hexdigest()
)
async def async_internal_added_to_hass(self) -> None:
"""Run when entity about to be added to hass."""
await super().async_internal_added_to_hass()
# Note: State is always updated by entity on return
await self.async_refresh_providers()
async def async_refresh_providers(self) -> bool:
"""Determine if any of the registered providers are suitable for this entity.
This affects state attributes, so it should be invoked any time the registered
providers or inputs to the state attributes change.
Returns True if any state was updated (and needs to be written)
"""
old_state = self._rtsp_to_webrtc
self._rtsp_to_webrtc = await self._async_use_rtsp_to_webrtc()
return old_state != self._rtsp_to_webrtc
async def _async_use_rtsp_to_webrtc(self) -> bool:
"""Determine if a WebRTC provider can be used for the camera."""
if not self.supported_features & SUPPORT_STREAM:
return False
if DATA_RTSP_TO_WEB_RTC not in self.hass.data:
return False
stream_source = await self.stream_source()
return any(
stream_source and stream_source.startswith(prefix)
for prefix in RTSP_PREFIXES
)
class CameraView(HomeAssistantView):
"""Base CameraView."""

View File

@ -4,6 +4,7 @@ from typing import Final
DOMAIN: Final = "camera"
DATA_CAMERA_PREFS: Final = "camera_prefs"
DATA_RTSP_TO_WEB_RTC: Final = "rtsp_to_web_rtc"
PREF_PRELOAD_STREAM: Final = "preload_stream"

View File

@ -28,6 +28,11 @@ from .common import EMPTY_8_6_JPEG, mock_turbo_jpeg
from tests.components.camera import common
STREAM_SOURCE = "rtsp://127.0.0.1/stream"
HLS_STREAM_SOURCE = "http://127.0.0.1/example.m3u"
WEBRTC_OFFER = "v=0\r\n"
WEBRTC_ANSWER = "a=sendonly"
@pytest.fixture(name="mock_camera")
async def mock_camera_fixture(hass):
@ -57,7 +62,7 @@ async def mock_camera_web_rtc_fixture(hass):
new_callable=PropertyMock(return_value=STREAM_TYPE_WEB_RTC),
), patch(
"homeassistant.components.camera.Camera.async_handle_web_rtc_offer",
return_value="a=sendonly",
return_value=WEBRTC_ANSWER,
):
yield
@ -85,6 +90,50 @@ async def image_mock_url_fixture(hass):
await hass.async_block_till_done()
@pytest.fixture(name="mock_stream_source")
async def mock_stream_source_fixture():
"""Fixture to create an RTSP stream source."""
with patch(
"homeassistant.components.camera.Camera.stream_source",
return_value=STREAM_SOURCE,
) as mock_stream_source, patch(
"homeassistant.components.camera.Camera.supported_features",
return_value=camera.SUPPORT_STREAM,
):
yield mock_stream_source
@pytest.fixture(name="mock_hls_stream_source")
async def mock_hls_stream_source_fixture():
"""Fixture to create an HLS stream source."""
with patch(
"homeassistant.components.camera.Camera.stream_source",
return_value=HLS_STREAM_SOURCE,
) as mock_hls_stream_source, patch(
"homeassistant.components.camera.Camera.supported_features",
return_value=camera.SUPPORT_STREAM,
):
yield mock_hls_stream_source
async def provide_web_rtc_answer(stream_source: str, offer: str) -> str:
"""Simulate an rtsp to webrtc provider."""
assert stream_source == STREAM_SOURCE
assert offer == WEBRTC_OFFER
return WEBRTC_ANSWER
@pytest.fixture(name="mock_rtsp_to_web_rtc")
async def mock_rtsp_to_web_rtc_fixture(hass):
"""Fixture that registers a mock rtsp to web_rtc provider."""
mock_provider = Mock(side_effect=provide_web_rtc_answer)
unsub = camera.async_register_rtsp_to_web_rtc_provider(
hass, "mock_domain", mock_provider
)
yield mock_provider
unsub()
async def test_get_image_from_camera(hass, image_mock_url):
"""Grab an image from camera entity."""
@ -189,17 +238,13 @@ async def test_get_image_from_camera_not_jpeg(hass, image_mock_url):
assert image.content == b"png"
async def test_get_stream_source_from_camera(hass, mock_camera):
async def test_get_stream_source_from_camera(hass, mock_camera, mock_stream_source):
"""Fetch stream source from camera entity."""
with patch(
"homeassistant.components.camera.Camera.stream_source",
return_value="rtsp://127.0.0.1/stream",
) as mock_camera_stream_source:
stream_source = await camera.async_get_stream_source(hass, "camera.demo_camera")
stream_source = await camera.async_get_stream_source(hass, "camera.demo_camera")
assert mock_camera_stream_source.called
assert stream_source == "rtsp://127.0.0.1/stream"
assert mock_stream_source.called
assert stream_source == STREAM_SOURCE
async def test_get_image_without_exists_camera(hass, image_mock_url):
@ -503,7 +548,7 @@ async def test_websocket_web_rtc_offer(
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": "v=0\r\n",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
@ -511,7 +556,7 @@ async def test_websocket_web_rtc_offer(
assert response["id"] == 9
assert response["type"] == TYPE_RESULT
assert response["success"]
assert response["result"]["answer"] == "a=sendonly"
assert response["result"]["answer"] == WEBRTC_ANSWER
async def test_websocket_web_rtc_offer_invalid_entity(
@ -526,7 +571,7 @@ async def test_websocket_web_rtc_offer_invalid_entity(
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.does_not_exist",
"offer": "v=0\r\n",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
@ -575,7 +620,7 @@ async def test_websocket_web_rtc_offer_failure(
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": "v=0\r\n",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
@ -604,7 +649,7 @@ async def test_websocket_web_rtc_offer_timeout(
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": "v=0\r\n",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
@ -628,7 +673,7 @@ async def test_websocket_web_rtc_offer_invalid_stream_type(
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": "v=0\r\n",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
@ -690,3 +735,145 @@ async def test_stream_unavailable(hass, hass_ws_client, mock_camera, mock_stream
demo_camera = hass.states.get("camera.demo_camera")
assert demo_camera is not None
assert demo_camera.state == camera.STATE_STREAMING
async def test_rtsp_to_web_rtc_offer(
hass,
hass_ws_client,
mock_camera,
mock_stream_source,
mock_rtsp_to_web_rtc,
):
"""Test creating a web_rtc offer from an rstp provider."""
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 9,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
assert response.get("id") == 9
assert response.get("type") == TYPE_RESULT
assert response.get("success")
assert "result" in response
assert response["result"] == {"answer": WEBRTC_ANSWER}
assert mock_rtsp_to_web_rtc.called
async def test_unsupported_rtsp_to_web_rtc_stream_type(
hass,
hass_ws_client,
mock_camera,
mock_hls_stream_source, # Not an RTSP stream source
mock_rtsp_to_web_rtc,
):
"""Test rtsp-to-webrtc is not registered for non-RTSP streams."""
client = await hass_ws_client(hass)
await client.send_json(
{
"id": 10,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
assert response.get("id") == 10
assert response.get("type") == TYPE_RESULT
assert "success" in response
assert not response["success"]
async def test_rtsp_to_web_rtc_provider_unregistered(
hass,
hass_ws_client,
mock_camera,
mock_stream_source,
):
"""Test creating a web_rtc offer from an rstp provider."""
mock_provider = Mock(side_effect=provide_web_rtc_answer)
unsub = camera.async_register_rtsp_to_web_rtc_provider(
hass, "mock_domain", mock_provider
)
client = await hass_ws_client(hass)
# Registered provider can handle the WebRTC offer
await client.send_json(
{
"id": 11,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
assert response["id"] == 11
assert response["type"] == TYPE_RESULT
assert response["success"]
assert response["result"]["answer"] == WEBRTC_ANSWER
assert mock_provider.called
mock_provider.reset_mock()
# Unregister provider, then verify the WebRTC offer cannot be handled
unsub()
await client.send_json(
{
"id": 12,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
assert response.get("id") == 12
assert response.get("type") == TYPE_RESULT
assert "success" in response
assert not response["success"]
assert not mock_provider.called
async def test_rtsp_to_web_rtc_offer_not_accepted(
hass,
hass_ws_client,
mock_camera,
mock_stream_source,
):
"""Test a provider that can't satisfy the rtsp to webrtc offer."""
async def provide_none(stream_source: str, offer: str) -> str:
"""Simulate a provider that can't accept the offer."""
return None
mock_provider = Mock(side_effect=provide_none)
unsub = camera.async_register_rtsp_to_web_rtc_provider(
hass, "mock_domain", mock_provider
)
client = await hass_ws_client(hass)
# Registered provider can handle the WebRTC offer
await client.send_json(
{
"id": 11,
"type": "camera/web_rtc_offer",
"entity_id": "camera.demo_camera",
"offer": WEBRTC_OFFER,
}
)
response = await client.receive_json()
assert response["id"] == 11
assert response.get("type") == TYPE_RESULT
assert "success" in response
assert not response["success"]
assert mock_provider.called
unsub()