Add support of taking a camera snapshot via go2rtc (#145205)

This commit is contained in:
Robert Resch 2025-06-20 10:34:43 +02:00 committed by GitHub
parent 84e9422254
commit 88683a318d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 441 additions and 243 deletions

View File

@ -240,6 +240,10 @@ async def _async_get_stream_image(
height: int | None = None,
wait_for_next_keyframe: bool = False,
) -> bytes | None:
if (provider := camera._webrtc_provider) and ( # noqa: SLF001
image := await provider.async_get_image(camera, width=width, height=height)
) is not None:
return image
if not camera.stream and CameraEntityFeature.STREAM in camera.supported_features:
camera.stream = await camera.async_create_stream()
if camera.stream:

View File

@ -156,6 +156,15 @@ class CameraWebRTCProvider(ABC):
"""Close the session."""
return ## This is an optional method so we need a default here.
async def async_get_image(
self,
camera: Camera,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Get an image from the camera."""
return None
@callback
def async_register_webrtc_provider(

View File

@ -1,8 +1,11 @@
"""The go2rtc component."""
from __future__ import annotations
import logging
import shutil
from aiohttp import ClientSession
from aiohttp.client_exceptions import ClientConnectionError, ServerConnectionError
from awesomeversion import AwesomeVersion
from go2rtc_client import Go2RtcRestClient
@ -32,7 +35,7 @@ from homeassistant.components.default_config import DOMAIN as DEFAULT_CONFIG_DOM
from homeassistant.config_entries import SOURCE_SYSTEM, ConfigEntry
from homeassistant.const import CONF_URL, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryNotReady, HomeAssistantError
from homeassistant.helpers import (
config_validation as cv,
discovery_flow,
@ -98,6 +101,7 @@ CONFIG_SCHEMA = vol.Schema(
_DATA_GO2RTC: HassKey[str] = HassKey(DOMAIN)
_RETRYABLE_ERRORS = (ClientConnectionError, ServerConnectionError)
type Go2RtcConfigEntry = ConfigEntry[WebRTCProvider]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
@ -151,13 +155,14 @@ async def _remove_go2rtc_entries(hass: HomeAssistant) -> None:
await hass.config_entries.async_remove(entry.entry_id)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: Go2RtcConfigEntry) -> bool:
"""Set up go2rtc from a config entry."""
url = hass.data[_DATA_GO2RTC]
url = hass.data[_DATA_GO2RTC]
session = async_get_clientsession(hass)
client = Go2RtcRestClient(session, url)
# Validate the server URL
try:
client = Go2RtcRestClient(async_get_clientsession(hass), url)
version = await client.validate_server_version()
if version < AwesomeVersion(RECOMMENDED_VERSION):
ir.async_create_issue(
@ -188,13 +193,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
_LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err)
return False
provider = WebRTCProvider(hass, url)
async_register_webrtc_provider(hass, provider)
provider = entry.runtime_data = WebRTCProvider(hass, url, session, client)
entry.async_on_unload(async_register_webrtc_provider(hass, provider))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: Go2RtcConfigEntry) -> bool:
"""Unload a go2rtc config entry."""
await entry.runtime_data.teardown()
return True
@ -206,12 +212,18 @@ async def _get_binary(hass: HomeAssistant) -> str | None:
class WebRTCProvider(CameraWebRTCProvider):
"""WebRTC provider."""
def __init__(self, hass: HomeAssistant, url: str) -> None:
def __init__(
self,
hass: HomeAssistant,
url: str,
session: ClientSession,
rest_client: Go2RtcRestClient,
) -> None:
"""Initialize the WebRTC provider."""
self._hass = hass
self._url = url
self._session = async_get_clientsession(hass)
self._rest_client = Go2RtcRestClient(self._session, url)
self._session = session
self._rest_client = rest_client
self._sessions: dict[str, Go2RtcWsClient] = {}
@property
@ -232,32 +244,16 @@ class WebRTCProvider(CameraWebRTCProvider):
send_message: WebRTCSendMessage,
) -> None:
"""Handle the WebRTC offer and return the answer via the provided callback."""
try:
await self._update_stream_source(camera)
except HomeAssistantError as err:
send_message(WebRTCError("go2rtc_webrtc_offer_failed", str(err)))
return
self._sessions[session_id] = ws_client = Go2RtcWsClient(
self._session, self._url, source=camera.entity_id
)
if not (stream_source := await camera.stream_source()):
send_message(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
)
return
streams = await self._rest_client.streams.list()
if (stream := streams.get(camera.entity_id)) is None or not any(
stream_source == producer.url for producer in stream.producers
):
await self._rest_client.streams.add(
camera.entity_id,
[
stream_source,
# We are setting any ffmpeg rtsp related logs to debug
# Connection problems to the camera will be logged by the first stream
# Therefore setting it to debug will not hide any important logs
f"ffmpeg:{camera.entity_id}#audio=opus#query=log_level=debug",
],
)
@callback
def on_messages(message: ReceiveMessages) -> None:
"""Handle messages."""
@ -291,3 +287,48 @@ class WebRTCProvider(CameraWebRTCProvider):
"""Close the session."""
ws_client = self._sessions.pop(session_id)
self._hass.async_create_task(ws_client.close())
async def async_get_image(
self,
camera: Camera,
width: int | None = None,
height: int | None = None,
) -> bytes | None:
"""Get an image from the camera."""
await self._update_stream_source(camera)
return await self._rest_client.get_jpeg_snapshot(
camera.entity_id, width, height
)
async def _update_stream_source(self, camera: Camera) -> None:
"""Update the stream source in go2rtc config if needed."""
if not (stream_source := await camera.stream_source()):
await self.teardown()
raise HomeAssistantError("Camera has no stream source")
if not self.async_is_supported(stream_source):
await self.teardown()
raise HomeAssistantError("Stream source is not supported by go2rtc")
streams = await self._rest_client.streams.list()
if (stream := streams.get(camera.entity_id)) is None or not any(
stream_source == producer.url for producer in stream.producers
):
await self._rest_client.streams.add(
camera.entity_id,
[
stream_source,
# We are setting any ffmpeg rtsp related logs to debug
# Connection problems to the camera will be logged by the first stream
# Therefore setting it to debug will not hide any important logs
f"ffmpeg:{camera.entity_id}#audio=opus#query=log_level=debug",
f"ffmpeg:{camera.entity_id}#video=mjpeg",
],
)
async def teardown(self) -> None:
"""Tear down the provider."""
for ws_client in self._sessions.values():
await ws_client.close()
self._sessions.clear()

View File

@ -567,6 +567,12 @@ def get_fixture_path(filename: str, integration: str | None = None) -> pathlib.P
)
@lru_cache
def load_fixture_bytes(filename: str, integration: str | None = None) -> bytes:
"""Load a fixture."""
return get_fixture_path(filename, integration).read_bytes()
@lru_cache
def load_fixture(filename: str, integration: str | None = None) -> str:
"""Load a fixture."""

View File

@ -1,5 +1,6 @@
"""The tests for the camera component."""
from collections.abc import Callable
from http import HTTPStatus
import io
from types import ModuleType
@ -876,6 +877,41 @@ async def test_entity_picture_url_changes_on_token_update(hass: HomeAssistant) -
assert "token=" in new_entity_picture
async def _register_test_webrtc_provider(hass: HomeAssistant) -> Callable[[], None]:
class SomeTestProvider(CameraWebRTCProvider):
"""Test provider."""
@property
def domain(self) -> str:
"""Return domain."""
return "test"
@callback
def async_is_supported(self, stream_source: str) -> bool:
"""Determine if the provider supports the stream source."""
return True
async def async_handle_async_webrtc_offer(
self,
camera: Camera,
offer_sdp: str,
session_id: str,
send_message: WebRTCSendMessage,
) -> None:
"""Handle the WebRTC offer and return the answer via the provided callback."""
send_message(WebRTCAnswer("answer"))
async def async_on_webrtc_candidate(
self, session_id: str, candidate: RTCIceCandidateInit
) -> None:
"""Handle the WebRTC candidate."""
provider = SomeTestProvider()
unsub = async_register_webrtc_provider(hass, provider)
await hass.async_block_till_done()
return unsub
async def _test_capabilities(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
@ -908,38 +944,7 @@ async def _test_capabilities(
await test(expected_stream_types)
# Test with WebRTC provider
class SomeTestProvider(CameraWebRTCProvider):
"""Test provider."""
@property
def domain(self) -> str:
"""Return domain."""
return "test"
@callback
def async_is_supported(self, stream_source: str) -> bool:
"""Determine if the provider supports the stream source."""
return True
async def async_handle_async_webrtc_offer(
self,
camera: Camera,
offer_sdp: str,
session_id: str,
send_message: WebRTCSendMessage,
) -> None:
"""Handle the WebRTC offer and return the answer via the provided callback."""
send_message(WebRTCAnswer("answer"))
async def async_on_webrtc_candidate(
self, session_id: str, candidate: RTCIceCandidateInit
) -> None:
"""Handle the WebRTC candidate."""
provider = SomeTestProvider()
async_register_webrtc_provider(hass, provider)
await hass.async_block_till_done()
await _register_test_webrtc_provider(hass)
await test(expected_stream_types_with_webrtc_provider)
@ -1026,3 +1031,82 @@ async def test_camera_capabilities_changing_native_support(
await hass.async_block_till_done()
await _test_capabilities(hass, hass_ws_client, cam.entity_id, set(), set())
@pytest.mark.usefixtures("mock_camera", "mock_stream_source")
async def test_snapshot_service_webrtc_provider(
hass: HomeAssistant,
) -> None:
"""Test snapshot service with the webrtc provider."""
await async_setup_component(hass, "camera", {})
await hass.async_block_till_done()
unsub = await _register_test_webrtc_provider(hass)
camera_obj = get_camera_from_entity_id(hass, "camera.demo_camera")
assert camera_obj._webrtc_provider
with (
patch.object(camera_obj, "use_stream_for_stills", return_value=True),
patch("homeassistant.components.camera.open"),
patch.object(
camera_obj._webrtc_provider,
"async_get_image",
wraps=camera_obj._webrtc_provider.async_get_image,
) as webrtc_get_image_mock,
patch.object(camera_obj, "stream", AsyncMock()) as stream_mock,
patch(
"homeassistant.components.camera.os.makedirs",
),
patch.object(hass.config, "is_allowed_path", return_value=True),
):
# WebRTC is not supporting get_image and the default implementation returns None
await hass.services.async_call(
camera.DOMAIN,
camera.SERVICE_SNAPSHOT,
{
ATTR_ENTITY_ID: camera_obj.entity_id,
camera.ATTR_FILENAME: "/test/snapshot.jpg",
},
blocking=True,
)
stream_mock.async_get_image.assert_called_once()
webrtc_get_image_mock.assert_called_once_with(
camera_obj, width=None, height=None
)
webrtc_get_image_mock.reset_mock()
stream_mock.reset_mock()
# Now provider supports get_image
webrtc_get_image_mock.return_value = b"Images bytes"
await hass.services.async_call(
camera.DOMAIN,
camera.SERVICE_SNAPSHOT,
{
ATTR_ENTITY_ID: camera_obj.entity_id,
camera.ATTR_FILENAME: "/test/snapshot.jpg",
},
blocking=True,
)
stream_mock.async_get_image.assert_not_called()
webrtc_get_image_mock.assert_called_once_with(
camera_obj, width=None, height=None
)
# Deregister provider
unsub()
await hass.async_block_till_done()
assert camera_obj._webrtc_provider is None
webrtc_get_image_mock.reset_mock()
stream_mock.reset_mock()
await hass.services.async_call(
camera.DOMAIN,
camera.SERVICE_SNAPSHOT,
{
ATTR_ENTITY_ID: camera_obj.entity_id,
camera.ATTR_FILENAME: "/test/snapshot.jpg",
},
blocking=True,
)
stream_mock.async_get_image.assert_called_once()
webrtc_get_image_mock.assert_not_called()

View File

@ -7,13 +7,7 @@ from homeassistant.components.feedreader.const import CONF_MAX_ENTRIES, DOMAIN
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, load_fixture
def load_fixture_bytes(src: str) -> bytes:
"""Return byte stream of fixture."""
feed_data = load_fixture(src, DOMAIN)
return bytes(feed_data, "utf-8")
from tests.common import MockConfigEntry
def create_mock_entry(

View File

@ -2,78 +2,77 @@
import pytest
from homeassistant.components.feedreader.const import DOMAIN
from homeassistant.components.feedreader.coordinator import EVENT_FEEDREADER
from homeassistant.core import Event, HomeAssistant
from . import load_fixture_bytes
from tests.common import async_capture_events
from tests.common import async_capture_events, load_fixture_bytes
@pytest.fixture(name="feed_one_event")
def fixture_feed_one_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for one event."""
return load_fixture_bytes("feedreader.xml")
return load_fixture_bytes("feedreader.xml", DOMAIN)
@pytest.fixture(name="feed_two_event")
def fixture_feed_two_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two event."""
return load_fixture_bytes("feedreader1.xml")
return load_fixture_bytes("feedreader1.xml", DOMAIN)
@pytest.fixture(name="feed_21_events")
def fixture_feed_21_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for twenty one events."""
return load_fixture_bytes("feedreader2.xml")
return load_fixture_bytes("feedreader2.xml", DOMAIN)
@pytest.fixture(name="feed_three_events")
def fixture_feed_three_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for three events."""
return load_fixture_bytes("feedreader3.xml")
return load_fixture_bytes("feedreader3.xml", DOMAIN)
@pytest.fixture(name="feed_four_events")
def fixture_feed_four_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for three events."""
return load_fixture_bytes("feedreader4.xml")
return load_fixture_bytes("feedreader4.xml", DOMAIN)
@pytest.fixture(name="feed_atom_event")
def fixture_feed_atom_event(hass: HomeAssistant) -> bytes:
"""Load test feed data for atom event."""
return load_fixture_bytes("feedreader5.xml")
return load_fixture_bytes("feedreader5.xml", DOMAIN)
@pytest.fixture(name="feed_identically_timed_events")
def fixture_feed_identically_timed_events(hass: HomeAssistant) -> bytes:
"""Load test feed data for two events published at the exact same time."""
return load_fixture_bytes("feedreader6.xml")
return load_fixture_bytes("feedreader6.xml", DOMAIN)
@pytest.fixture(name="feed_without_items")
def fixture_feed_without_items(hass: HomeAssistant) -> bytes:
"""Load test feed without any items."""
return load_fixture_bytes("feedreader7.xml")
return load_fixture_bytes("feedreader7.xml", DOMAIN)
@pytest.fixture(name="feed_only_summary")
def fixture_feed_only_summary(hass: HomeAssistant) -> bytes:
"""Load test feed data with one event containing only a summary, no content."""
return load_fixture_bytes("feedreader8.xml")
return load_fixture_bytes("feedreader8.xml", DOMAIN)
@pytest.fixture(name="feed_htmlentities")
def fixture_feed_htmlentities(hass: HomeAssistant) -> bytes:
"""Load test feed data with HTML Entities."""
return load_fixture_bytes("feedreader9.xml")
return load_fixture_bytes("feedreader9.xml", DOMAIN)
@pytest.fixture(name="feed_atom_htmlentities")
def fixture_feed_atom_htmlentities(hass: HomeAssistant) -> bytes:
"""Load test ATOM feed data with HTML Entities."""
return load_fixture_bytes("feedreader10.xml")
return load_fixture_bytes("feedreader10.xml", DOMAIN)
@pytest.fixture(name="events")

View File

@ -1 +1,32 @@
"""Go2rtc tests."""
from homeassistant.components.camera import Camera, CameraEntityFeature
class MockCamera(Camera):
"""Mock Camera Entity."""
_attr_name = "Test"
_attr_supported_features: CameraEntityFeature = CameraEntityFeature.STREAM
def __init__(self) -> None:
"""Initialize the mock entity."""
super().__init__()
self._stream_source: str | None = "rtsp://stream"
def set_stream_source(self, stream_source: str | None) -> None:
"""Set the stream source."""
self._stream_source = stream_source
async def stream_source(self) -> str | None:
"""Return the source of the stream.
This is used by cameras with CameraEntityFeature.STREAM
and StreamType.HLS.
"""
return self._stream_source
@property
def use_stream_for_stills(self) -> bool:
"""Always use the RTSP stream to generate snapshots."""
return True

View File

@ -7,8 +7,24 @@ from awesomeversion import AwesomeVersion
from go2rtc_client.rest import _StreamClient, _WebRTCClient
import pytest
from homeassistant.components.go2rtc.const import RECOMMENDED_VERSION
from homeassistant.components.camera import DOMAIN as CAMERA_DOMAIN
from homeassistant.components.go2rtc.const import DOMAIN, RECOMMENDED_VERSION
from homeassistant.components.go2rtc.server import Server
from homeassistant.config_entries import ConfigEntry, ConfigFlow
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from . import MockCamera
from tests.common import (
MockConfigEntry,
MockModule,
mock_config_flow,
mock_integration,
mock_platform,
setup_test_component_platform,
)
GO2RTC_PATH = "homeassistant.components.go2rtc"
@ -18,7 +34,7 @@ def rest_client() -> Generator[AsyncMock]:
"""Mock a go2rtc rest client."""
with (
patch(
"homeassistant.components.go2rtc.Go2RtcRestClient",
"homeassistant.components.go2rtc.Go2RtcRestClient", autospec=True
) as mock_client,
patch("homeassistant.components.go2rtc.server.Go2RtcRestClient", mock_client),
):
@ -94,3 +110,104 @@ def server(server_start: AsyncMock, server_stop: AsyncMock) -> Generator[AsyncMo
"""Mock a go2rtc server."""
with patch(f"{GO2RTC_PATH}.Server", wraps=Server) as mock_server:
yield mock_server
@pytest.fixture(name="is_docker_env")
def is_docker_env_fixture() -> bool:
"""Fixture to provide is_docker_env return value."""
return True
@pytest.fixture
def mock_is_docker_env(is_docker_env: bool) -> Generator[Mock]:
"""Mock is_docker_env."""
with patch(
"homeassistant.components.go2rtc.is_docker_env",
return_value=is_docker_env,
) as mock_is_docker_env:
yield mock_is_docker_env
@pytest.fixture(name="go2rtc_binary")
def go2rtc_binary_fixture() -> str:
"""Fixture to provide go2rtc binary name."""
return "/usr/bin/go2rtc"
@pytest.fixture
def mock_get_binary(go2rtc_binary: str) -> Generator[Mock]:
"""Mock _get_binary."""
with patch(
"homeassistant.components.go2rtc.shutil.which",
return_value=go2rtc_binary,
) as mock_which:
yield mock_which
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
rest_client: AsyncMock,
mock_is_docker_env: Generator[Mock],
mock_get_binary: Generator[Mock],
server: Mock,
) -> None:
"""Initialize the go2rtc integration."""
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
TEST_DOMAIN = "test"
@pytest.fixture
def integration_config_entry(hass: HomeAssistant) -> ConfigEntry:
"""Test mock config entry."""
entry = MockConfigEntry(domain=TEST_DOMAIN)
entry.add_to_hass(hass)
return entry
@pytest.fixture
async def init_test_integration(
hass: HomeAssistant,
integration_config_entry: ConfigEntry,
) -> MockCamera:
"""Initialize components."""
async def async_setup_entry_init(
hass: HomeAssistant, config_entry: ConfigEntry
) -> bool:
"""Set up test config entry."""
await hass.config_entries.async_forward_entry_setups(
config_entry, [Platform.CAMERA]
)
return True
async def async_unload_entry_init(
hass: HomeAssistant, config_entry: ConfigEntry
) -> bool:
"""Unload test config entry."""
await hass.config_entries.async_forward_entry_unload(
config_entry, Platform.CAMERA
)
return True
mock_integration(
hass,
MockModule(
TEST_DOMAIN,
async_setup_entry=async_setup_entry_init,
async_unload_entry=async_unload_entry_init,
),
)
test_camera = MockCamera()
setup_test_component_platform(
hass, CAMERA_DOMAIN, [test_camera], from_config_entry=True
)
mock_platform(hass, f"{TEST_DOMAIN}.config_flow", Mock())
with mock_config_flow(TEST_DOMAIN, ConfigFlow):
assert await hass.config_entries.async_setup(integration_config_entry.entry_id)
await hass.async_block_till_done()
return test_camera

Binary file not shown.

After

Width:  |  Height:  |  Size: 286 KiB

View File

@ -1,6 +1,6 @@
"""The tests for the go2rtc component."""
from collections.abc import Callable, Generator
from collections.abc import Callable
import logging
from typing import NamedTuple
from unittest.mock import AsyncMock, Mock, patch
@ -21,41 +21,32 @@ import pytest
from webrtc_models import RTCIceCandidateInit
from homeassistant.components.camera import (
DOMAIN as CAMERA_DOMAIN,
Camera,
CameraEntityFeature,
StreamType,
WebRTCAnswer as HAWebRTCAnswer,
WebRTCCandidate as HAWebRTCCandidate,
WebRTCError,
WebRTCMessage,
WebRTCSendMessage,
async_get_image,
)
from homeassistant.components.default_config import DOMAIN as DEFAULT_CONFIG_DOMAIN
from homeassistant.components.go2rtc import WebRTCProvider
from homeassistant.components.go2rtc import HomeAssistant, WebRTCProvider
from homeassistant.components.go2rtc.const import (
CONF_DEBUG_UI,
DEBUG_UI_URL_MESSAGE,
DOMAIN,
RECOMMENDED_VERSION,
)
from homeassistant.config_entries import ConfigEntry, ConfigEntryState, ConfigFlow
from homeassistant.const import CONF_URL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_URL
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import issue_registry as ir
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
from tests.common import (
MockConfigEntry,
MockModule,
mock_config_flow,
mock_integration,
mock_platform,
setup_test_component_platform,
)
from . import MockCamera
TEST_DOMAIN = "test"
from tests.common import MockConfigEntry, load_fixture_bytes
# The go2rtc provider does not inspect the details of the offer and answer,
# and is only a pass through.
@ -63,54 +54,6 @@ OFFER_SDP = "v=0\r\no=carol 28908764872 28908764872 IN IP4 100.3.6.6\r\n..."
ANSWER_SDP = "v=0\r\no=bob 2890844730 2890844730 IN IP4 host.example.com\r\n..."
class MockCamera(Camera):
"""Mock Camera Entity."""
_attr_name = "Test"
_attr_supported_features: CameraEntityFeature = CameraEntityFeature.STREAM
def __init__(self) -> None:
"""Initialize the mock entity."""
super().__init__()
self._stream_source: str | None = "rtsp://stream"
def set_stream_source(self, stream_source: str | None) -> None:
"""Set the stream source."""
self._stream_source = stream_source
async def stream_source(self) -> str | None:
"""Return the source of the stream.
This is used by cameras with CameraEntityFeature.STREAM
and StreamType.HLS.
"""
return self._stream_source
@pytest.fixture
def integration_config_entry(hass: HomeAssistant) -> ConfigEntry:
"""Test mock config entry."""
entry = MockConfigEntry(domain=TEST_DOMAIN)
entry.add_to_hass(hass)
return entry
@pytest.fixture(name="go2rtc_binary")
def go2rtc_binary_fixture() -> str:
"""Fixture to provide go2rtc binary name."""
return "/usr/bin/go2rtc"
@pytest.fixture
def mock_get_binary(go2rtc_binary) -> Generator[Mock]:
"""Mock _get_binary."""
with patch(
"homeassistant.components.go2rtc.shutil.which",
return_value=go2rtc_binary,
) as mock_which:
yield mock_which
@pytest.fixture(name="has_go2rtc_entry")
def has_go2rtc_entry_fixture() -> bool:
"""Fixture to control if a go2rtc config entry should be created."""
@ -126,80 +69,6 @@ def mock_go2rtc_entry(hass: HomeAssistant, has_go2rtc_entry: bool) -> None:
config_entry.add_to_hass(hass)
@pytest.fixture(name="is_docker_env")
def is_docker_env_fixture() -> bool:
"""Fixture to provide is_docker_env return value."""
return True
@pytest.fixture
def mock_is_docker_env(is_docker_env) -> Generator[Mock]:
"""Mock is_docker_env."""
with patch(
"homeassistant.components.go2rtc.is_docker_env",
return_value=is_docker_env,
) as mock_is_docker_env:
yield mock_is_docker_env
@pytest.fixture
async def init_integration(
hass: HomeAssistant,
rest_client: AsyncMock,
mock_is_docker_env,
mock_get_binary,
server: Mock,
) -> None:
"""Initialize the go2rtc integration."""
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
@pytest.fixture
async def init_test_integration(
hass: HomeAssistant,
integration_config_entry: ConfigEntry,
) -> MockCamera:
"""Initialize components."""
async def async_setup_entry_init(
hass: HomeAssistant, config_entry: ConfigEntry
) -> bool:
"""Set up test config entry."""
await hass.config_entries.async_forward_entry_setups(
config_entry, [Platform.CAMERA]
)
return True
async def async_unload_entry_init(
hass: HomeAssistant, config_entry: ConfigEntry
) -> bool:
"""Unload test config entry."""
await hass.config_entries.async_forward_entry_unload(
config_entry, Platform.CAMERA
)
return True
mock_integration(
hass,
MockModule(
TEST_DOMAIN,
async_setup_entry=async_setup_entry_init,
async_unload_entry=async_unload_entry_init,
),
)
test_camera = MockCamera()
setup_test_component_platform(
hass, CAMERA_DOMAIN, [test_camera], from_config_entry=True
)
mock_platform(hass, f"{TEST_DOMAIN}.config_flow", Mock())
with mock_config_flow(TEST_DOMAIN, ConfigFlow):
assert await hass.config_entries.async_setup(integration_config_entry.entry_id)
await hass.async_block_till_done()
return test_camera
async def _test_setup_and_signaling(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
@ -218,14 +87,18 @@ async def _test_setup_and_signaling(
assert issue_registry.async_get_issue(DOMAIN, "recommended_version") is None
config_entries = hass.config_entries.async_entries(DOMAIN)
assert len(config_entries) == 1
assert config_entries[0].state == ConfigEntryState.LOADED
config_entry = config_entries[0]
assert config_entry.state == ConfigEntryState.LOADED
after_setup_fn()
receive_message_callback = Mock(spec_set=WebRTCSendMessage)
async def test() -> None:
sessions = []
async def test(session: str) -> None:
sessions.append(session)
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
OFFER_SDP, session, receive_message_callback
)
ws_client.send.assert_called_once_with(
WebRTCOffer(
@ -240,13 +113,14 @@ async def _test_setup_and_signaling(
callback(WebRTCAnswer(ANSWER_SDP))
receive_message_callback.assert_called_once_with(HAWebRTCAnswer(ANSWER_SDP))
await test()
await test("sesion_1")
rest_client.streams.add.assert_called_once_with(
entity_id,
[
"rtsp://stream",
f"ffmpeg:{camera.entity_id}#audio=opus#query=log_level=debug",
f"ffmpeg:{camera.entity_id}#video=mjpeg",
],
)
@ -258,13 +132,14 @@ async def _test_setup_and_signaling(
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
await test("session_2")
rest_client.streams.add.assert_called_once_with(
entity_id,
[
"rtsp://stream",
f"ffmpeg:{camera.entity_id}#audio=opus#query=log_level=debug",
f"ffmpeg:{camera.entity_id}#video=mjpeg",
],
)
@ -276,25 +151,37 @@ async def _test_setup_and_signaling(
receive_message_callback.reset_mock()
ws_client.reset_mock()
await test()
await test("session_3")
rest_client.streams.add.assert_not_called()
assert isinstance(camera._webrtc_provider, WebRTCProvider)
# Set stream source to None and provider should be skipped
rest_client.streams.list.return_value = {}
receive_message_callback.reset_mock()
camera.set_stream_source(None)
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
receive_message_callback.assert_called_once_with(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
)
provider = camera._webrtc_provider
for session in sessions:
assert session in provider._sessions
with patch.object(provider, "teardown", wraps=provider.teardown) as teardown:
# Set stream source to None and provider should be skipped
rest_client.streams.list.return_value = {}
receive_message_callback.reset_mock()
camera.set_stream_source(None)
await camera.async_handle_async_webrtc_offer(
OFFER_SDP, "session_id", receive_message_callback
)
receive_message_callback.assert_called_once_with(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
)
teardown.assert_called_once()
# We use one ws_client mock for all sessions
assert ws_client.close.call_count == len(sessions)
await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state == ConfigEntryState.NOT_LOADED
assert teardown.call_count == 2
@pytest.mark.usefixtures(
"init_test_integration",
"mock_get_binary",
"mock_is_docker_env",
"mock_go2rtc_entry",
@ -757,3 +644,29 @@ async def test_setup_with_recommended_version_repair(
"recommended_version": RECOMMENDED_VERSION,
"current_version": "1.9.5",
}
@pytest.mark.usefixtures("init_integration")
async def test_async_get_image(
hass: HomeAssistant,
init_test_integration: MockCamera,
rest_client: AsyncMock,
) -> None:
"""Test getting snapshot from go2rtc."""
camera = init_test_integration
assert isinstance(camera._webrtc_provider, WebRTCProvider)
image_bytes = load_fixture_bytes("snapshot.jpg", DOMAIN)
rest_client.get_jpeg_snapshot.return_value = image_bytes
assert await camera._webrtc_provider.async_get_image(camera) == image_bytes
image = await async_get_image(hass, camera.entity_id)
assert image.content == image_bytes
camera.set_stream_source("invalid://not_supported")
with pytest.raises(
HomeAssistantError, match="Stream source is not supported by go2rtc"
):
await async_get_image(hass, camera.entity_id)