Make UniFI Protect views more versatile (#82928)

This commit is contained in:
Christopher Bailey 2022-11-29 23:49:16 -05:00 committed by GitHub
parent 1908feab79
commit 6c5aa3b8f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 4 deletions

View File

@ -8,11 +8,12 @@ from typing import Any
from urllib.parse import urlencode
from aiohttp import web
from pyunifiprotect.data import Event
from pyunifiprotect.data import Camera, Event
from pyunifiprotect.exceptions import ClientError
from homeassistant.components.http import HomeAssistantView
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er
from .const import DOMAIN
from .data import ProtectData
@ -104,8 +105,10 @@ class ProtectProxyView(HomeAssistantView):
def _get_data_or_404(self, nvr_id: str) -> ProtectData | web.Response:
all_data: list[ProtectData] = []
for data in self.data.values():
for entry_id, data in self.data.items():
if isinstance(data, ProtectData):
if nvr_id == entry_id:
return data
if data.api.bootstrap.nvr.id == nvr_id:
return data
all_data.append(data)
@ -160,6 +163,27 @@ class VideoProxyView(ProtectProxyView):
url = "/api/unifiprotect/video/{nvr_id}/{camera_id}/{start}/{end}"
name = "api:unifiprotect_thumbnail"
@callback
def _async_get_camera(self, data: ProtectData, camera_id: str) -> Camera | None:
if (camera := data.api.bootstrap.cameras.get(camera_id)) is not None:
return camera
entity_registry = er.async_get(self.hass)
device_registry = dr.async_get(self.hass)
if (entity := entity_registry.async_get(camera_id)) is None or (
device := device_registry.async_get(entity.device_id or "")
) is None:
return None
macs = [c[1] for c in device.connections if c[0] == dr.CONNECTION_NETWORK_MAC]
for mac in macs:
if (ufp_device := data.api.bootstrap.get_device_from_mac(mac)) is not None:
if isinstance(ufp_device, Camera):
camera = ufp_device
break
return camera
async def get(
self, request: web.Request, nvr_id: str, camera_id: str, start: str, end: str
) -> web.StreamResponse:
@ -169,7 +193,7 @@ class VideoProxyView(ProtectProxyView):
if isinstance(data, web.Response):
return data
camera = data.api.bootstrap.cameras.get(camera_id)
camera = self._async_get_camera(data, camera_id)
if camera is None:
return _404(f"Invalid camera ID: {camera_id}")
if not camera.can_read_media(data.api.bootstrap.auth_user):

View File

@ -111,7 +111,7 @@ async def test_thumbnail(
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test invalid NVR ID in URL."""
"""Test NVR ID in URL."""
ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest")
@ -127,6 +127,28 @@ async def test_thumbnail(
ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None)
async def test_thumbnail_entry_id(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
) -> None:
"""Test config entry ID in URL."""
ufp.api.get_event_thumbnail = AsyncMock(return_value=b"testtest")
await init_entry(hass, ufp, [camera])
url = async_generate_thumbnail_url("test_id", ufp.entry.entry_id)
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert response.status == 200
assert response.content_type == "image/jpeg"
assert await response.content.read() == b"testtest"
ufp.api.get_event_thumbnail.assert_called_with("test_id", width=None, height=None)
async def test_video_bad_event(
hass: HomeAssistant,
ufp: MockUFPFixture,
@ -425,3 +447,47 @@ async def test_video(
assert response.status == 200
ufp.api.request.assert_called_once
async def test_video_entity_id(
hass: HomeAssistant,
hass_client: mock_aiohttp_client,
ufp: MockUFPFixture,
camera: Camera,
fixed_now: datetime,
) -> None:
"""Test video URL with no video."""
content = Mock()
content.__anext__ = AsyncMock(side_effect=[b"test", b"test", StopAsyncIteration()])
content.__aiter__ = Mock(return_value=content)
mock_response = Mock()
mock_response.content_length = 8
mock_response.content.iter_chunked = Mock(return_value=content)
ufp.api.request = AsyncMock(return_value=mock_response)
await init_entry(hass, ufp, [camera])
event_start = fixed_now - timedelta(seconds=30)
event = Event(
api=ufp.api,
camera_id=camera.id,
start=event_start,
end=fixed_now,
id="test_id",
type=EventType.MOTION,
score=100,
smart_detect_types=[],
smart_detect_event_ids=[],
)
url = async_generate_event_video_url(event)
url = url.replace(camera.id, "camera.test_camera_high")
http_client = await hass_client()
response = cast(ClientResponse, await http_client.get(url))
assert await response.content.read() == b"testtest"
assert response.status == 200
ufp.api.request.assert_called_once