Add go2rtc workaround for HA managed one until upstream fixes it (#130139)

This commit is contained in:
Robert Resch 2024-11-08 15:54:14 +01:00 committed by GitHub
parent 6c7ac7a6ef
commit 51e691f832
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 270 additions and 37 deletions

View File

@ -1,5 +1,8 @@
"""The go2rtc component.""" """The go2rtc component."""
from __future__ import annotations
from dataclasses import dataclass
import logging import logging
import shutil import shutil
@ -38,7 +41,13 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey from homeassistant.util.hass_dict import HassKey
from homeassistant.util.package import is_docker_env from homeassistant.util.package import is_docker_env
from .const import CONF_DEBUG_UI, DEBUG_UI_URL_MESSAGE, DOMAIN, HA_MANAGED_URL from .const import (
CONF_DEBUG_UI,
DEBUG_UI_URL_MESSAGE,
DOMAIN,
HA_MANAGED_RTSP_PORT,
HA_MANAGED_URL,
)
from .server import Server from .server import Server
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -85,13 +94,22 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA, extra=vol.ALLOW_EXTRA,
) )
_DATA_GO2RTC: HassKey[str] = HassKey(DOMAIN) _DATA_GO2RTC: HassKey[Go2RtcData] = HassKey(DOMAIN)
_RETRYABLE_ERRORS = (ClientConnectionError, ServerConnectionError) _RETRYABLE_ERRORS = (ClientConnectionError, ServerConnectionError)
@dataclass(frozen=True)
class Go2RtcData:
"""Data for go2rtc."""
url: str
managed: bool
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up WebRTC.""" """Set up WebRTC."""
url: str | None = None url: str | None = None
managed = False
if DOMAIN not in config and DEFAULT_CONFIG_DOMAIN not in config: if DOMAIN not in config and DEFAULT_CONFIG_DOMAIN not in config:
await _remove_go2rtc_entries(hass) await _remove_go2rtc_entries(hass)
return True return True
@ -126,8 +144,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, on_stop) hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, on_stop)
url = HA_MANAGED_URL url = HA_MANAGED_URL
managed = True
hass.data[_DATA_GO2RTC] = url hass.data[_DATA_GO2RTC] = Go2RtcData(url, managed)
discovery_flow.async_create_flow( discovery_flow.async_create_flow(
hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={} hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={}
) )
@ -142,28 +161,32 @@ async def _remove_go2rtc_entries(hass: HomeAssistant) -> None:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up go2rtc from a config entry.""" """Set up go2rtc from a config entry."""
url = hass.data[_DATA_GO2RTC] data = hass.data[_DATA_GO2RTC]
# Validate the server URL # Validate the server URL
try: try:
client = Go2RtcRestClient(async_get_clientsession(hass), url) client = Go2RtcRestClient(async_get_clientsession(hass), data.url)
await client.validate_server_version() await client.validate_server_version()
except Go2RtcClientError as err: except Go2RtcClientError as err:
if isinstance(err.__cause__, _RETRYABLE_ERRORS): if isinstance(err.__cause__, _RETRYABLE_ERRORS):
raise ConfigEntryNotReady( raise ConfigEntryNotReady(
f"Could not connect to go2rtc instance on {url}" f"Could not connect to go2rtc instance on {data.url}"
) from err ) from err
_LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err) _LOGGER.warning(
"Could not connect to go2rtc instance on %s (%s)", data.url, err
)
return False return False
except Go2RtcVersionError as err: except Go2RtcVersionError as err:
raise ConfigEntryNotReady( raise ConfigEntryNotReady(
f"The go2rtc server version is not supported, {err}" f"The go2rtc server version is not supported, {err}"
) from err ) from err
except Exception as err: # noqa: BLE001 except Exception as err: # noqa: BLE001
_LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err) _LOGGER.warning(
"Could not connect to go2rtc instance on %s (%s)", data.url, err
)
return False return False
provider = WebRTCProvider(hass, url) provider = WebRTCProvider(hass, data)
async_register_webrtc_provider(hass, provider) async_register_webrtc_provider(hass, provider)
return True return True
@ -181,12 +204,12 @@ async def _get_binary(hass: HomeAssistant) -> str | None:
class WebRTCProvider(CameraWebRTCProvider): class WebRTCProvider(CameraWebRTCProvider):
"""WebRTC provider.""" """WebRTC provider."""
def __init__(self, hass: HomeAssistant, url: str) -> None: def __init__(self, hass: HomeAssistant, data: Go2RtcData) -> None:
"""Initialize the WebRTC provider.""" """Initialize the WebRTC provider."""
self._hass = hass self._hass = hass
self._url = url self._data = data
self._session = async_get_clientsession(hass) self._session = async_get_clientsession(hass)
self._rest_client = Go2RtcRestClient(self._session, url) self._rest_client = Go2RtcRestClient(self._session, data.url)
self._sessions: dict[str, Go2RtcWsClient] = {} self._sessions: dict[str, Go2RtcWsClient] = {}
@property @property
@ -208,7 +231,7 @@ class WebRTCProvider(CameraWebRTCProvider):
) -> None: ) -> None:
"""Handle the WebRTC offer and return the answer via the provided callback.""" """Handle the WebRTC offer and return the answer via the provided callback."""
self._sessions[session_id] = ws_client = Go2RtcWsClient( self._sessions[session_id] = ws_client = Go2RtcWsClient(
self._session, self._url, source=camera.entity_id self._session, self._data.url, source=camera.entity_id
) )
if not (stream_source := await camera.stream_source()): if not (stream_source := await camera.stream_source()):
@ -219,8 +242,30 @@ class WebRTCProvider(CameraWebRTCProvider):
streams = await self._rest_client.streams.list() streams = await self._rest_client.streams.list()
if (stream := streams.get(camera.entity_id)) is None or not any( if self._data.managed:
stream_source == producer.url for producer in stream.producers # HA manages the go2rtc instance
stream_org_name = camera.entity_id + "_orginal"
stream_redirect_sources = [
f"rtsp://127.0.0.1:{HA_MANAGED_RTSP_PORT}/{stream_org_name}",
f"ffmpeg:{stream_org_name}#audio=opus",
]
if (
(stream_org := streams.get(stream_org_name)) is None
or not any(
stream_source == producer.url for producer in stream_org.producers
)
or (stream_redirect := streams.get(camera.entity_id)) is None
or stream_redirect_sources != [p.url for p in stream_redirect.producers]
):
await self._rest_client.streams.add(stream_org_name, stream_source)
await self._rest_client.streams.add(
camera.entity_id, stream_redirect_sources
)
# go2rtc instance is managed outside HA
elif (stream_org := streams.get(camera.entity_id)) is None or not any(
stream_source == producer.url for producer in stream_org.producers
): ):
await self._rest_client.streams.add( await self._rest_client.streams.add(
camera.entity_id, camera.entity_id,

View File

@ -6,3 +6,4 @@ CONF_DEBUG_UI = "debug_ui"
DEBUG_UI_URL_MESSAGE = "Url and debug_ui cannot be set at the same time." DEBUG_UI_URL_MESSAGE = "Url and debug_ui cannot be set at the same time."
HA_MANAGED_API_PORT = 11984 HA_MANAGED_API_PORT = 11984
HA_MANAGED_URL = f"http://localhost:{HA_MANAGED_API_PORT}/" HA_MANAGED_URL = f"http://localhost:{HA_MANAGED_API_PORT}/"
HA_MANAGED_RTSP_PORT = 18554

View File

@ -12,7 +12,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import HA_MANAGED_API_PORT, HA_MANAGED_URL from .const import HA_MANAGED_API_PORT, HA_MANAGED_RTSP_PORT, HA_MANAGED_URL
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
_TERMINATE_TIMEOUT = 5 _TERMINATE_TIMEOUT = 5
@ -24,15 +24,16 @@ _RESPAWN_COOLDOWN = 1
# Default configuration for HA # Default configuration for HA
# - Api is listening only on localhost # - Api is listening only on localhost
# - Disable rtsp listener # - Enable rtsp for localhost only as ffmpeg needs it
# - Clear default ice servers # - Clear default ice servers
_GO2RTC_CONFIG_FORMAT = r""" _GO2RTC_CONFIG_FORMAT = r"""# This file is managed by Home Assistant
# Do not edit it manually
api: api:
listen: "{api_ip}:{api_port}" listen: "{api_ip}:{api_port}"
rtsp: rtsp:
# ffmpeg needs rtsp for opus audio transcoding listen: "127.0.0.1:{rtsp_port}"
listen: "127.0.0.1:18554"
webrtc: webrtc:
listen: ":18555/tcp" listen: ":18555/tcp"
@ -67,7 +68,9 @@ def _create_temp_file(api_ip: str) -> str:
with NamedTemporaryFile(prefix="go2rtc_", suffix=".yaml", delete=False) as file: with NamedTemporaryFile(prefix="go2rtc_", suffix=".yaml", delete=False) as file:
file.write( file.write(
_GO2RTC_CONFIG_FORMAT.format( _GO2RTC_CONFIG_FORMAT.format(
api_ip=api_ip, api_port=HA_MANAGED_API_PORT api_ip=api_ip,
api_port=HA_MANAGED_API_PORT,
rtsp_port=HA_MANAGED_RTSP_PORT,
).encode() ).encode()
) )
return file.name return file.name

View File

@ -3,7 +3,7 @@
from collections.abc import Callable, Generator from collections.abc import Callable, Generator
import logging import logging
from typing import NamedTuple from typing import NamedTuple
from unittest.mock import AsyncMock, Mock, patch from unittest.mock import AsyncMock, Mock, call, patch
from aiohttp.client_exceptions import ClientConnectionError, ServerConnectionError from aiohttp.client_exceptions import ClientConnectionError, ServerConnectionError
from go2rtc_client import Stream from go2rtc_client import Stream
@ -296,7 +296,7 @@ async def _test_setup_and_signaling(
], ],
) )
@pytest.mark.parametrize("has_go2rtc_entry", [True, False]) @pytest.mark.parametrize("has_go2rtc_entry", [True, False])
async def test_setup_go_binary( async def test_setup_managed(
hass: HomeAssistant, hass: HomeAssistant,
rest_client: AsyncMock, rest_client: AsyncMock,
ws_client: Mock, ws_client: Mock,
@ -308,15 +308,131 @@ async def test_setup_go_binary(
config: ConfigType, config: ConfigType,
ui_enabled: bool, ui_enabled: bool,
) -> None: ) -> None:
"""Test the go2rtc config entry with binary.""" """Test the go2rtc setup with managed go2rtc instance."""
assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry
camera = init_test_integration
def after_setup() -> None: entity_id = camera.entity_id
server.assert_called_once_with(hass, "/usr/bin/go2rtc", enable_ui=ui_enabled) stream_name_orginal = camera.entity_id + "_orginal"
server_start.assert_called_once() assert camera.frontend_stream_type == StreamType.HLS
await _test_setup_and_signaling( assert await async_setup_component(hass, DOMAIN, config)
hass, rest_client, ws_client, config, after_setup, init_test_integration await hass.async_block_till_done(wait_background_tasks=True)
config_entries = hass.config_entries.async_entries(DOMAIN)
assert len(config_entries) == 1
assert config_entries[0].state == ConfigEntryState.LOADED
server.assert_called_once_with(hass, "/usr/bin/go2rtc", enable_ui=ui_enabled)
server_start.assert_called_once()
receive_message_callback = Mock(spec_set=WebRTCSendMessage)
async def test() -> None:
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
ws_client.send.assert_called_once_with(
WebRTCOffer(
OFFER_SDP,
camera.async_get_webrtc_client_configuration().configuration.ice_servers,
)
)
ws_client.subscribe.assert_called_once()
# Simulate the answer from the go2rtc server
callback = ws_client.subscribe.call_args[0][0]
callback(WebRTCAnswer(ANSWER_SDP))
receive_message_callback.assert_called_once_with(HAWebRTCAnswer(ANSWER_SDP))
await test()
stream_added_calls = [
call(stream_name_orginal, "rtsp://stream"),
call(
entity_id,
[
f"rtsp://127.0.0.1:18554/{stream_name_orginal}",
f"ffmpeg:{stream_name_orginal}#audio=opus",
],
),
]
assert rest_client.streams.add.call_args_list == stream_added_calls
# Stream original missing
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
entity_id: Stream(
[
Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"),
Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"),
]
)
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
assert rest_client.streams.add.call_args_list == stream_added_calls
# Stream original source different
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
stream_name_orginal: Stream([Producer("rtsp://different")]),
entity_id: Stream(
[
Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"),
Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"),
]
),
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
assert rest_client.streams.add.call_args_list == stream_added_calls
# Stream source different
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
stream_name_orginal: Stream([Producer("rtsp://stream")]),
entity_id: Stream([Producer("rtsp://different")]),
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
assert rest_client.streams.add.call_args_list == stream_added_calls
# If the stream is already added, the stream should not be added again.
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
stream_name_orginal: Stream([Producer("rtsp://stream")]),
entity_id: Stream(
[
Producer(f"rtsp://127.0.0.1:18554/{stream_name_orginal}"),
Producer(f"ffmpeg:{stream_name_orginal}#audio=opus"),
]
),
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
rest_client.streams.add.assert_not_called()
assert isinstance(camera._webrtc_provider, WebRTCProvider)
# Set stream source to None and provider should be skipped
rest_client.streams.list.return_value = {}
receive_message_callback.reset_mock()
camera.set_stream_source(None)
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
receive_message_callback.assert_called_once_with(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
) )
await hass.async_stop() await hass.async_stop()
@ -332,7 +448,7 @@ async def test_setup_go_binary(
], ],
) )
@pytest.mark.parametrize("has_go2rtc_entry", [True, False]) @pytest.mark.parametrize("has_go2rtc_entry", [True, False])
async def test_setup_go( async def test_setup_self_hosted(
hass: HomeAssistant, hass: HomeAssistant,
rest_client: AsyncMock, rest_client: AsyncMock,
ws_client: Mock, ws_client: Mock,
@ -342,16 +458,83 @@ async def test_setup_go(
mock_is_docker_env: Mock, mock_is_docker_env: Mock,
has_go2rtc_entry: bool, has_go2rtc_entry: bool,
) -> None: ) -> None:
"""Test the go2rtc config entry without binary.""" """Test the go2rtc with selfhosted go2rtc instance."""
assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry assert (len(hass.config_entries.async_entries(DOMAIN)) == 1) == has_go2rtc_entry
config = {DOMAIN: {CONF_URL: "http://localhost:1984/"}} config = {DOMAIN: {CONF_URL: "http://localhost:1984/"}}
camera = init_test_integration
def after_setup() -> None: entity_id = camera.entity_id
server.assert_not_called() assert camera.frontend_stream_type == StreamType.HLS
await _test_setup_and_signaling( assert await async_setup_component(hass, DOMAIN, config)
hass, rest_client, ws_client, config, after_setup, init_test_integration await hass.async_block_till_done(wait_background_tasks=True)
config_entries = hass.config_entries.async_entries(DOMAIN)
assert len(config_entries) == 1
assert config_entries[0].state == ConfigEntryState.LOADED
server.assert_not_called()
receive_message_callback = Mock(spec_set=WebRTCSendMessage)
async def test() -> None:
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
ws_client.send.assert_called_once_with(
WebRTCOffer(
OFFER_SDP,
camera.async_get_webrtc_client_configuration().configuration.ice_servers,
)
)
ws_client.subscribe.assert_called_once()
# Simulate the answer from the go2rtc server
callback = ws_client.subscribe.call_args[0][0]
callback(WebRTCAnswer(ANSWER_SDP))
receive_message_callback.assert_called_once_with(HAWebRTCAnswer(ANSWER_SDP))
await test()
rest_client.streams.add.assert_called_once_with(
entity_id, ["rtsp://stream", f"ffmpeg:{camera.entity_id}#audio=opus"]
)
# Stream exists but the source is different
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
entity_id: Stream([Producer("rtsp://different")])
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
rest_client.streams.add.assert_called_once_with(
entity_id, ["rtsp://stream", f"ffmpeg:{camera.entity_id}#audio=opus"]
)
# If the stream is already added, the stream should not be added again.
rest_client.streams.add.reset_mock()
rest_client.streams.list.return_value = {
entity_id: Stream([Producer("rtsp://stream")])
}
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
rest_client.streams.add.assert_not_called()
assert isinstance(camera._webrtc_provider, WebRTCProvider)
# Set stream source to None and provider should be skipped
rest_client.streams.list.return_value = {}
receive_message_callback.reset_mock()
camera.set_stream_source(None)
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
receive_message_callback.assert_called_once_with(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
) )
mock_get_binary.assert_not_called() mock_get_binary.assert_not_called()

View File

@ -105,12 +105,13 @@ async def test_server_run_success(
# Verify that the config file was written # Verify that the config file was written
mock_tempfile.write.assert_called_once_with( mock_tempfile.write.assert_called_once_with(
f""" f"""# This file is managed by Home Assistant
# Do not edit it manually
api: api:
listen: "{api_ip}:11984" listen: "{api_ip}:11984"
rtsp: rtsp:
# ffmpeg needs rtsp for opus audio transcoding
listen: "127.0.0.1:18554" listen: "127.0.0.1:18554"
webrtc: webrtc: