Add motionEye media browser (#53436)

This commit is contained in:
Dermot Duffy 2021-10-31 08:59:31 -07:00 committed by GitHub
parent 3c5799e394
commit ab7d8db481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 1090 additions and 2 deletions

View File

@ -3,9 +3,11 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
import contextlib
from http import HTTPStatus
import json
import logging
import os
from types import MappingProxyType
from typing import Any
from urllib.parse import urlencode, urljoin
@ -15,13 +17,17 @@ from motioneye_client.client import (
MotionEyeClient,
MotionEyeClientError,
MotionEyeClientInvalidAuthError,
MotionEyeClientPathError,
)
from motioneye_client.const import (
KEY_CAMERAS,
KEY_HTTP_METHOD_POST_JSON,
KEY_ID,
KEY_NAME,
KEY_ROOT_DIRECTORY,
KEY_WEB_HOOK_CONVERSION_SPECIFIERS,
KEY_WEB_HOOK_CS_FILE_PATH,
KEY_WEB_HOOK_CS_FILE_TYPE,
KEY_WEB_HOOK_NOTIFICATIONS_ENABLED,
KEY_WEB_HOOK_NOTIFICATIONS_HTTP_METHOD,
KEY_WEB_HOOK_NOTIFICATIONS_URL,
@ -31,6 +37,7 @@ from motioneye_client.const import (
)
from homeassistant.components.camera.const import DOMAIN as CAMERA_DOMAIN
from homeassistant.components.media_source.const import URI_SCHEME
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.components.switch import DOMAIN as SWITCH_DOMAIN
from homeassistant.components.webhook import (
@ -74,6 +81,8 @@ from .const import (
DOMAIN,
EVENT_FILE_STORED,
EVENT_FILE_STORED_KEYS,
EVENT_FILE_URL,
EVENT_MEDIA_CONTENT_ID,
EVENT_MOTION_DETECTED,
EVENT_MOTION_DETECTED_KEYS,
MOTIONEYE_MANUFACTURER,
@ -101,6 +110,20 @@ def get_motioneye_device_identifier(
return (DOMAIN, f"{config_entry_id}_{camera_id}")
def split_motioneye_device_identifier(
identifier: tuple[str, str]
) -> tuple[str, str, int] | None:
"""Get the identifiers for a motionEye device."""
if len(identifier) != 2 or identifier[0] != DOMAIN or "_" not in identifier[1]:
return None
config_id, camera_id_str = identifier[1].split("_", 1)
try:
camera_id = int(camera_id_str)
except ValueError:
return None
return (DOMAIN, config_id, camera_id)
def get_motioneye_entity_unique_id(
config_entry_id: str, camera_id: int, entity_type: str
) -> str:
@ -428,6 +451,21 @@ async def handle_webhook(
status=HTTPStatus.BAD_REQUEST,
)
if KEY_WEB_HOOK_CS_FILE_PATH in data and KEY_WEB_HOOK_CS_FILE_TYPE in data:
try:
event_file_type = int(data[KEY_WEB_HOOK_CS_FILE_TYPE])
except ValueError:
pass
else:
data.update(
_get_media_event_data(
hass,
device,
data[KEY_WEB_HOOK_CS_FILE_PATH],
event_file_type,
)
)
hass.bus.async_fire(
f"{DOMAIN}.{event_type}",
{
@ -440,6 +478,69 @@ async def handle_webhook(
return None
def _get_media_event_data(
hass: HomeAssistant,
device: dr.DeviceEntry,
event_file_path: str,
event_file_type: int,
) -> dict[str, str]:
config_entry_id = next(iter(device.config_entries), None)
if not config_entry_id or config_entry_id not in hass.data[DOMAIN]:
return {}
config_entry_data = hass.data[DOMAIN][config_entry_id]
client = config_entry_data[CONF_CLIENT]
coordinator = config_entry_data[CONF_COORDINATOR]
for identifier in device.identifiers:
data = split_motioneye_device_identifier(identifier)
if data is not None:
camera_id = data[2]
camera = get_camera_from_cameras(camera_id, coordinator.data)
break
else:
return {}
root_directory = camera.get(KEY_ROOT_DIRECTORY) if camera else None
if root_directory is None:
return {}
kind = "images" if client.is_file_type_image(event_file_type) else "movies"
# The file_path in the event is the full local filesystem path to the
# media. To convert that to the media path that motionEye will
# understand, we need to strip the root directory from the path.
if os.path.commonprefix([root_directory, event_file_path]) != root_directory:
return {}
file_path = "/" + os.path.relpath(event_file_path, root_directory)
output = {
EVENT_MEDIA_CONTENT_ID: (
f"{URI_SCHEME}{DOMAIN}/{config_entry_id}#{device.id}#{kind}#{file_path}"
),
}
url = get_media_url(
client,
camera_id,
file_path,
kind == "images",
)
if url:
output[EVENT_FILE_URL] = url
return output
def get_media_url(
client: MotionEyeClient, camera_id: int, path: str, image: bool
) -> str | None:
"""Get the URL for a motionEye media item."""
with contextlib.suppress(MotionEyeClientPathError):
if image:
return client.get_image_url(camera_id, path)
return client.get_movie_url(camera_id, path)
return None
class MotionEyeEntity(CoordinatorEntity):
"""Base class for motionEye entities."""

View File

@ -80,6 +80,9 @@ EVENT_FILE_STORED_KEYS: Final = [
KEY_WEB_HOOK_CS_MOTION_VERSION,
]
EVENT_FILE_URL: Final = "file_url"
EVENT_MEDIA_CONTENT_ID: Final = "media_content_id"
MOTIONEYE_MANUFACTURER: Final = "motionEye"
SERVICE_SET_TEXT_OVERLAY: Final = "set_text_overlay"

View File

@ -5,6 +5,7 @@
"config_flow": true,
"dependencies": [
"http",
"media_source",
"webhook"
],
"requirements": [

View File

@ -0,0 +1,349 @@
"""motionEye Media Source Implementation."""
from __future__ import annotations
import logging
from pathlib import PurePath
from typing import Optional, Tuple, cast
from motioneye_client.const import KEY_MEDIA_LIST, KEY_MIME_TYPE, KEY_PATH
from homeassistant.components.media_player.const import (
MEDIA_CLASS_DIRECTORY,
MEDIA_CLASS_IMAGE,
MEDIA_CLASS_MOVIE,
MEDIA_CLASS_VIDEO,
)
from homeassistant.components.media_source.error import MediaSourceError, Unresolvable
from homeassistant.components.media_source.models import (
BrowseMediaSource,
MediaSource,
MediaSourceItem,
PlayMedia,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from . import get_media_url, split_motioneye_device_identifier
from .const import CONF_CLIENT, DOMAIN
MIME_TYPE_MAP = {
"movies": "video/mp4",
"images": "image/jpeg",
}
MEDIA_CLASS_MAP = {
"movies": MEDIA_CLASS_VIDEO,
"images": MEDIA_CLASS_IMAGE,
}
_LOGGER = logging.getLogger(__name__)
# Hierarchy:
#
# url (e.g. http://my-motioneye-1, http://my-motioneye-2)
# -> Camera (e.g. "Office", "Kitchen")
# -> kind (e.g. Images, Movies)
# -> path hierarchy as configured on motionEye
async def async_get_media_source(hass: HomeAssistant) -> MotionEyeMediaSource:
"""Set up motionEye media source."""
return MotionEyeMediaSource(hass)
class MotionEyeMediaSource(MediaSource):
"""Provide motionEye stills and videos as media sources."""
name: str = "motionEye Media"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize MotionEyeMediaSource."""
super().__init__(DOMAIN)
self.hass = hass
async def async_resolve_media(self, item: MediaSourceItem) -> PlayMedia:
"""Resolve media to a url."""
config_id, device_id, kind, path = self._parse_identifier(item.identifier)
if not config_id or not device_id or not kind or not path:
raise Unresolvable(
f"Incomplete media identifier specified: {item.identifier}"
)
config = self._get_config_or_raise(config_id)
device = self._get_device_or_raise(device_id)
self._verify_kind_or_raise(kind)
url = get_media_url(
self.hass.data[DOMAIN][config.entry_id][CONF_CLIENT],
self._get_camera_id_or_raise(config, device),
self._get_path_or_raise(path),
kind == "images",
)
if not url:
raise Unresolvable(f"Could not resolve media item: {item.identifier}")
return PlayMedia(url, MIME_TYPE_MAP[kind])
@callback
@classmethod
def _parse_identifier(
cls, identifier: str
) -> tuple[str | None, str | None, str | None, str | None]:
base = [None] * 4
data = identifier.split("#", 3)
return cast(
Tuple[Optional[str], Optional[str], Optional[str], Optional[str]],
tuple(data + base)[:4], # type: ignore[operator]
)
async def async_browse_media(
self,
item: MediaSourceItem,
) -> BrowseMediaSource:
"""Return media."""
if item.identifier:
config_id, device_id, kind, path = self._parse_identifier(item.identifier)
config = device = None
if config_id:
config = self._get_config_or_raise(config_id)
if device_id:
device = self._get_device_or_raise(device_id)
if kind:
self._verify_kind_or_raise(kind)
path = self._get_path_or_raise(path)
if config and device and kind:
return await self._build_media_path(config, device, kind, path)
if config and device:
return self._build_media_kinds(config, device)
if config:
return self._build_media_devices(config)
return self._build_media_configs()
def _get_config_or_raise(self, config_id: str) -> ConfigEntry:
"""Get a config entry from a URL."""
entry = self.hass.config_entries.async_get_entry(config_id)
if not entry:
raise MediaSourceError(f"Unable to find config entry with id: {config_id}")
return entry
def _get_device_or_raise(self, device_id: str) -> dr.DeviceEntry:
"""Get a config entry from a URL."""
device_registry = dr.async_get(self.hass)
device = device_registry.async_get(device_id)
if not device:
raise MediaSourceError(f"Unable to find device with id: {device_id}")
return device
@classmethod
def _verify_kind_or_raise(cls, kind: str) -> None:
"""Verify kind is an expected value."""
if kind in MEDIA_CLASS_MAP:
return
raise MediaSourceError(f"Unknown media type: {kind}")
@classmethod
def _get_path_or_raise(cls, path: str | None) -> str:
"""Verify path is a valid motionEye path."""
if not path:
return "/"
if PurePath(path).root == "/":
return path
raise MediaSourceError(
f"motionEye media path must start with '/', received: {path}"
)
@classmethod
def _get_camera_id_or_raise(
cls, config: ConfigEntry, device: dr.DeviceEntry
) -> int:
"""Get a config entry from a URL."""
for identifier in device.identifiers:
data = split_motioneye_device_identifier(identifier)
if data is not None:
return data[2]
raise MediaSourceError(f"Could not find camera id for device id: {device.id}")
@classmethod
def _build_media_config(cls, config: ConfigEntry) -> BrowseMediaSource:
return BrowseMediaSource(
domain=DOMAIN,
identifier=config.entry_id,
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type="",
title=config.title,
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_DIRECTORY,
)
def _build_media_configs(self) -> BrowseMediaSource:
"""Build the media sources for config entries."""
return BrowseMediaSource(
domain=DOMAIN,
identifier="",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type="",
title="motionEye Media",
can_play=False,
can_expand=True,
children=[
self._build_media_config(entry)
for entry in self.hass.config_entries.async_entries(DOMAIN)
],
children_media_class=MEDIA_CLASS_DIRECTORY,
)
@classmethod
def _build_media_device(
cls,
config: ConfigEntry,
device: dr.DeviceEntry,
full_title: bool = True,
) -> BrowseMediaSource:
return BrowseMediaSource(
domain=DOMAIN,
identifier=f"{config.entry_id}#{device.id}",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type="",
title=f"{config.title} {device.name}" if full_title else device.name,
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_DIRECTORY,
)
def _build_media_devices(self, config: ConfigEntry) -> BrowseMediaSource:
"""Build the media sources for device entries."""
device_registry = dr.async_get(self.hass)
devices = dr.async_entries_for_config_entry(device_registry, config.entry_id)
base = self._build_media_config(config)
base.children = [
self._build_media_device(config, device, full_title=False)
for device in devices
]
return base
@classmethod
def _build_media_kind(
cls,
config: ConfigEntry,
device: dr.DeviceEntry,
kind: str,
full_title: bool = True,
) -> BrowseMediaSource:
return BrowseMediaSource(
domain=DOMAIN,
identifier=f"{config.entry_id}#{device.id}#{kind}",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=MEDIA_CLASS_DIRECTORY,
title=(
f"{config.title} {device.name} {kind.title()}"
if full_title
else kind.title()
),
can_play=False,
can_expand=True,
children_media_class=(
MEDIA_CLASS_MOVIE if kind == "movies" else MEDIA_CLASS_IMAGE
),
)
def _build_media_kinds(
self, config: ConfigEntry, device: dr.DeviceEntry
) -> BrowseMediaSource:
base = self._build_media_device(config, device)
base.children = [
self._build_media_kind(config, device, kind, full_title=False)
for kind in MEDIA_CLASS_MAP
]
return base
async def _build_media_path(
self,
config: ConfigEntry,
device: dr.DeviceEntry,
kind: str,
path: str,
) -> BrowseMediaSource:
"""Build the media sources for media kinds."""
base = self._build_media_kind(config, device, kind)
parsed_path = PurePath(path)
if path != "/":
base.title += " " + str(PurePath(*parsed_path.parts[1:]))
base.children = []
client = self.hass.data[DOMAIN][config.entry_id][CONF_CLIENT]
camera_id = self._get_camera_id_or_raise(config, device)
if kind == "movies":
resp = await client.async_get_movies(camera_id)
else:
resp = await client.async_get_images(camera_id)
sub_dirs: set[str] = set()
parts = parsed_path.parts
for media in resp.get(KEY_MEDIA_LIST, []):
if (
KEY_PATH not in media
or KEY_MIME_TYPE not in media
or media[KEY_MIME_TYPE] not in MIME_TYPE_MAP.values()
):
continue
# Example path: '/2021-04-21/21-13-10.mp4'
parts_media = PurePath(media[KEY_PATH]).parts
if parts_media[: len(parts)] == parts and len(parts_media) > len(parts):
full_child_path = str(PurePath(*parts_media[: len(parts) + 1]))
display_child_path = parts_media[len(parts)]
# Child is a media file.
if len(parts) + 1 == len(parts_media):
if kind == "movies":
thumbnail_url = client.get_movie_url(
camera_id, full_child_path, preview=True
)
else:
thumbnail_url = client.get_image_url(
camera_id, full_child_path, preview=True
)
base.children.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{config.entry_id}#{device.id}#{kind}#{full_child_path}",
media_class=MEDIA_CLASS_MAP[kind],
media_content_type=media[KEY_MIME_TYPE],
title=display_child_path,
can_play=(kind == "movies"),
can_expand=False,
thumbnail=thumbnail_url,
)
)
# Child is a subdirectory.
elif len(parts) + 1 < len(parts_media):
if full_child_path not in sub_dirs:
sub_dirs.add(full_child_path)
base.children.append(
BrowseMediaSource(
domain=DOMAIN,
identifier=(
f"{config.entry_id}#{device.id}"
f"#{kind}#{full_child_path}"
),
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=MEDIA_CLASS_DIRECTORY,
title=display_child_path,
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_DIRECTORY,
)
)
return base

View File

@ -0,0 +1,482 @@
"""Test Local Media Source."""
import logging
from unittest.mock import AsyncMock, Mock, call
from motioneye_client.client import MotionEyeClientPathError
import pytest
from homeassistant.components import media_source
from homeassistant.components.media_source import const
from homeassistant.components.media_source.error import MediaSourceError, Unresolvable
from homeassistant.components.media_source.models import PlayMedia
from homeassistant.components.motioneye.const import DOMAIN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from . import (
TEST_CAMERA_DEVICE_IDENTIFIER,
TEST_CAMERA_ID,
TEST_CONFIG_ENTRY_ID,
create_mock_motioneye_client,
setup_mock_motioneye_config_entry,
)
TEST_MOVIES = {
"mediaList": [
{
"mimeType": "video/mp4",
"sizeStr": "4.7 MB",
"momentStrShort": "25 Apr, 00:26",
"timestamp": 1619335614.0353653,
"momentStr": "25 April 2021, 00:26",
"path": "/2021-04-25/00-26-22.mp4",
},
{
"mimeType": "video/mp4",
"sizeStr": "9.2 MB",
"momentStrShort": "25 Apr, 00:37",
"timestamp": 1619336268.0683491,
"momentStr": "25 April 2021, 00:37",
"path": "/2021-04-25/00-36-49.mp4",
},
{
"mimeType": "video/mp4",
"sizeStr": "28.3 MB",
"momentStrShort": "25 Apr, 00:03",
"timestamp": 1619334211.0403328,
"momentStr": "25 April 2021, 00:03",
"path": "/2021-04-25/00-02-27.mp4",
},
]
}
TEST_IMAGES = {
"mediaList": [
{
"mimeType": "image/jpeg",
"sizeStr": "216.5 kB",
"momentStrShort": "12 Apr, 20:13",
"timestamp": 1618283619.6541321,
"momentStr": "12 April 2021, 20:13",
"path": "/2021-04-12/20-13-39.jpg",
}
],
}
_LOGGER = logging.getLogger(__name__)
async def test_async_browse_media_success(hass: HomeAssistant) -> None:
"""Test successful browse media."""
client = create_mock_motioneye_client()
config = await setup_mock_motioneye_config_entry(hass, client=client)
device_registry = await dr.async_get_registry(hass)
device = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={TEST_CAMERA_DEVICE_IDENTIFIER},
)
media = await media_source.async_browse_media(
hass,
f"{const.URI_SCHEME}{DOMAIN}",
)
assert media.as_dict() == {
"title": "motionEye Media",
"media_class": "directory",
"media_content_type": "",
"media_content_id": "media-source://motioneye",
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
"children": [
{
"title": "http://test:8766",
"media_class": "directory",
"media_content_type": "",
"media_content_id": (
"media-source://motioneye/74565ad414754616000674c87bdc876c"
),
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
}
],
}
media = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}"
)
assert media.as_dict() == {
"title": "http://test:8766",
"media_class": "directory",
"media_content_type": "",
"media_content_id": (
"media-source://motioneye/74565ad414754616000674c87bdc876c"
),
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
"children": [
{
"title": "Test Camera",
"media_class": "directory",
"media_content_type": "",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}"
),
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
}
],
}
media = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}#{device.id}"
)
assert media.as_dict() == {
"title": "http://test:8766 Test Camera",
"media_class": "directory",
"media_content_type": "",
"media_content_id": (
f"media-source://motioneye/74565ad414754616000674c87bdc876c#{device.id}"
),
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
"children": [
{
"title": "Movies",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies"
),
"can_play": False,
"can_expand": True,
"children_media_class": "movie",
"thumbnail": None,
},
{
"title": "Images",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#images"
),
"can_play": False,
"can_expand": True,
"children_media_class": "image",
"thumbnail": None,
},
],
}
client.async_get_movies = AsyncMock(return_value=TEST_MOVIES)
media = await media_source.async_browse_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}#{device.id}#movies"
)
assert media.as_dict() == {
"title": "http://test:8766 Test Camera Movies",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies"
),
"can_play": False,
"can_expand": True,
"children_media_class": "movie",
"thumbnail": None,
"children": [
{
"title": "2021-04-25",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies#/2021-04-25"
),
"can_play": False,
"can_expand": True,
"children_media_class": "directory",
"thumbnail": None,
}
],
}
client.get_movie_url = Mock(return_value="http://movie")
media = await media_source.async_browse_media(
hass,
f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}#{device.id}#movies#/2021-04-25",
)
assert media.as_dict() == {
"title": "http://test:8766 Test Camera Movies 2021-04-25",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies"
),
"can_play": False,
"can_expand": True,
"children_media_class": "movie",
"thumbnail": None,
"children": [
{
"title": "00-26-22.mp4",
"media_class": "video",
"media_content_type": "video/mp4",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies#"
"/2021-04-25/00-26-22.mp4"
),
"can_play": True,
"can_expand": False,
"children_media_class": None,
"thumbnail": "http://movie",
},
{
"title": "00-36-49.mp4",
"media_class": "video",
"media_content_type": "video/mp4",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies#"
"/2021-04-25/00-36-49.mp4"
),
"can_play": True,
"can_expand": False,
"children_media_class": None,
"thumbnail": "http://movie",
},
{
"title": "00-02-27.mp4",
"media_class": "video",
"media_content_type": "video/mp4",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies#"
"/2021-04-25/00-02-27.mp4"
),
"can_play": True,
"can_expand": False,
"children_media_class": None,
"thumbnail": "http://movie",
},
],
}
async def test_async_browse_media_images_success(hass: HomeAssistant) -> None:
"""Test successful browse media of images."""
client = create_mock_motioneye_client()
config = await setup_mock_motioneye_config_entry(hass, client=client)
device_registry = await dr.async_get_registry(hass)
device = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={TEST_CAMERA_DEVICE_IDENTIFIER},
)
client.async_get_images = AsyncMock(return_value=TEST_IMAGES)
client.get_image_url = Mock(return_value="http://image")
media = await media_source.async_browse_media(
hass,
f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}#{device.id}#images#/2021-04-12",
)
assert media.as_dict() == {
"title": "http://test:8766 Test Camera Images 2021-04-12",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#images"
),
"can_play": False,
"can_expand": True,
"children_media_class": "image",
"thumbnail": None,
"children": [
{
"title": "20-13-39.jpg",
"media_class": "image",
"media_content_type": "image/jpeg",
"media_content_id": (
"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#images#"
"/2021-04-12/20-13-39.jpg"
),
"can_play": False,
"can_expand": False,
"children_media_class": None,
"thumbnail": "http://image",
}
],
}
async def test_async_resolve_media_success(hass: HomeAssistant) -> None:
"""Test successful resolve media."""
client = create_mock_motioneye_client()
config = await setup_mock_motioneye_config_entry(hass, client=client)
device_registry = await dr.async_get_registry(hass)
device = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={TEST_CAMERA_DEVICE_IDENTIFIER},
)
# Test successful resolve for a movie.
client.get_movie_url = Mock(return_value="http://movie-url")
media = await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{device.id}#movies#/foo.mp4"
),
)
assert media == PlayMedia(url="http://movie-url", mime_type="video/mp4")
assert client.get_movie_url.call_args == call(TEST_CAMERA_ID, "/foo.mp4")
# Test successful resolve for an image.
client.get_image_url = Mock(return_value="http://image-url")
media = await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{device.id}#images#/foo.jpg"
),
)
assert media == PlayMedia(url="http://image-url", mime_type="image/jpeg")
assert client.get_image_url.call_args == call(TEST_CAMERA_ID, "/foo.jpg")
async def test_async_resolve_media_failure(hass: HomeAssistant) -> None:
"""Test failed resolve media calls."""
client = create_mock_motioneye_client()
config = await setup_mock_motioneye_config_entry(hass, client=client)
device_registry = await dr.async_get_registry(hass)
device = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={TEST_CAMERA_DEVICE_IDENTIFIER},
)
broken_device_1 = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={(DOMAIN, config.entry_id)},
)
broken_device_2 = device_registry.async_get_or_create(
config_entry_id=config.entry_id,
identifiers={(DOMAIN, f"{config.entry_id}_NOTINT")},
)
client.get_movie_url = Mock(return_value="http://url")
# URI doesn't contain necessary components.
with pytest.raises(Unresolvable):
await media_source.async_resolve_media(hass, f"{const.URI_SCHEME}{DOMAIN}/foo")
# Config entry doesn't exist.
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/1#2#3#4"
)
# Device doesn't exist.
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass, f"{const.URI_SCHEME}{DOMAIN}/{TEST_CONFIG_ENTRY_ID}#2#3#4"
)
# Device identifiers are incorrect (no camera id)
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{broken_device_1.id}#images#4"
),
)
# Device identifiers are incorrect (non integer camera id)
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{broken_device_2.id}#images#4"
),
)
# Kind is incorrect.
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass,
f"{const.URI_SCHEME}{DOMAIN}/{TEST_CONFIG_ENTRY_ID}#{device.id}#games#moo",
)
# Playback URL raises exception.
client.get_movie_url = Mock(side_effect=MotionEyeClientPathError)
with pytest.raises(Unresolvable):
await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{device.id}#movies#/foo.mp4"
),
)
# Media path does not start with '/'
client.get_movie_url = Mock(side_effect=MotionEyeClientPathError)
with pytest.raises(MediaSourceError):
await media_source.async_resolve_media(
hass,
(
f"{const.URI_SCHEME}{DOMAIN}"
f"/{TEST_CONFIG_ENTRY_ID}#{device.id}#movies#foo.mp4"
),
)
# Media missing path.
broken_movies = {"mediaList": [{}, {"path": "something", "mimeType": "NOT_A_MIME"}]}
client.async_get_movies = AsyncMock(return_value=broken_movies)
media = await media_source.async_browse_media(
hass,
f"{const.URI_SCHEME}{DOMAIN}/{config.entry_id}#{device.id}#movies#/2021-04-25",
)
assert media.as_dict() == {
"title": "http://test:8766 Test Camera Movies 2021-04-25",
"media_class": "directory",
"media_content_type": "directory",
"media_content_id": (
f"media-source://motioneye"
f"/74565ad414754616000674c87bdc876c#{device.id}#movies"
),
"can_play": False,
"can_expand": True,
"children_media_class": "movie",
"thumbnail": None,
"children": [],
}

View File

@ -2,11 +2,12 @@
import copy
from http import HTTPStatus
from typing import Any
from unittest.mock import AsyncMock, call, patch
from unittest.mock import AsyncMock, Mock, call, patch
from motioneye_client.const import (
KEY_CAMERAS,
KEY_HTTP_METHOD_POST_JSON,
KEY_ROOT_DIRECTORY,
KEY_WEB_HOOK_NOTIFICATIONS_ENABLED,
KEY_WEB_HOOK_NOTIFICATIONS_HTTP_METHOD,
KEY_WEB_HOOK_NOTIFICATIONS_URL,
@ -18,6 +19,7 @@ from motioneye_client.const import (
from homeassistant.components.motioneye.const import (
ATTR_EVENT_TYPE,
CONF_WEBHOOK_SET_OVERWRITE,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
EVENT_FILE_STORED,
EVENT_MOTION_DETECTED,
@ -28,6 +30,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.network import NoURLAvailableError
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from . import (
TEST_CAMERA,
@ -36,13 +39,14 @@ from . import (
TEST_CAMERA_ID,
TEST_CAMERA_NAME,
TEST_CAMERAS,
TEST_CONFIG_ENTRY_ID,
TEST_URL,
create_mock_motioneye_client,
create_mock_motioneye_config_entry,
setup_mock_motioneye_config_entry,
)
from tests.common import async_capture_events
from tests.common import async_capture_events, async_fire_time_changed
WEB_HOOK_MOTION_DETECTED_QUERY_STRING = (
"camera_id=%t&changed_pixels=%D&despeckle_labels=%Q&event=%v&fps=%{fps}"
@ -368,3 +372,151 @@ async def test_bad_query_cannot_decode(
assert resp.status == HTTPStatus.BAD_REQUEST
assert not motion_events
assert not storage_events
async def test_event_media_data(hass: HomeAssistant, hass_client_no_auth: Any) -> None:
"""Test an event with a file path generates media data."""
await async_setup_component(hass, "http", {"http": {}})
device_registry = await dr.async_get_registry(hass)
client = create_mock_motioneye_client()
config_entry = await setup_mock_motioneye_config_entry(hass, client=client)
device = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={TEST_CAMERA_DEVICE_IDENTIFIER},
)
hass_client = await hass_client_no_auth()
events = async_capture_events(hass, f"{DOMAIN}.{EVENT_FILE_STORED}")
client.get_movie_url = Mock(return_value="http://movie-url")
client.get_image_url = Mock(return_value="http://image-url")
# Test: Movie storage.
client.is_file_type_image = Mock(return_value=False)
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": f"/var/lib/motioneye/{TEST_CAMERA_NAME}/dir/one",
"file_type": "8",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 1
assert events[-1].data["file_url"] == "http://movie-url"
assert (
events[-1].data["media_content_id"]
== f"media-source://motioneye/{TEST_CONFIG_ENTRY_ID}#{device.id}#movies#/dir/one"
)
assert client.get_movie_url.call_args == call(TEST_CAMERA_ID, "/dir/one")
# Test: Image storage.
client.is_file_type_image = Mock(return_value=True)
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": f"/var/lib/motioneye/{TEST_CAMERA_NAME}/dir/two",
"file_type": "4",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 2
assert events[-1].data["file_url"] == "http://image-url"
assert (
events[-1].data["media_content_id"]
== f"media-source://motioneye/{TEST_CONFIG_ENTRY_ID}#{device.id}#images#/dir/two"
)
assert client.get_image_url.call_args == call(TEST_CAMERA_ID, "/dir/two")
# Test: Invalid file type.
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": f"/var/lib/motioneye/{TEST_CAMERA_NAME}/dir/three",
"file_type": "NOT_AN_INT",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 3
assert "file_url" not in events[-1].data
assert "media_content_id" not in events[-1].data
# Test: Different file path.
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": "/var/random",
"file_type": "8",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 4
assert "file_url" not in events[-1].data
assert "media_content_id" not in events[-1].data
# Test: Not a loaded motionEye config entry.
wrong_device = device_registry.async_get_or_create(
config_entry_id="wrong_config_id", identifiers={("motioneye", "a_1")}
)
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: wrong_device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": "/var/random",
"file_type": "8",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 5
assert "file_url" not in events[-1].data
assert "media_content_id" not in events[-1].data
# Test: No root directory.
camera = copy.deepcopy(TEST_CAMERA)
del camera[KEY_ROOT_DIRECTORY]
client.async_get_cameras = AsyncMock(return_value={"cameras": [camera]})
async_fire_time_changed(hass, dt_util.utcnow() + DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": f"/var/lib/motioneye/{TEST_CAMERA_NAME}/dir/four",
"file_type": "8",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 6
assert "file_url" not in events[-1].data
assert "media_content_id" not in events[-1].data
# Test: Device has incorrect device identifiers.
device_registry.async_update_device(
device_id=device.id, new_identifiers={("not", "motioneye")}
)
resp = await hass_client.post(
URL_WEBHOOK_PATH.format(webhook_id=config_entry.data[CONF_WEBHOOK_ID]),
json={
ATTR_DEVICE_ID: device.id,
ATTR_EVENT_TYPE: EVENT_FILE_STORED,
"file_path": f"/var/lib/motioneye/{TEST_CAMERA_NAME}/dir/five",
"file_type": "8",
},
)
assert resp.status == HTTPStatus.OK
assert len(events) == 7
assert "file_url" not in events[-1].data
assert "media_content_id" not in events[-1].data