ESPHome media proxy (#123254)

* Add ffmpeg proxy view

* Add tests

* Add proxy to media player

* Add proxy test

* Only allow one ffmpeg proc per device

* Incorporate feedback

* Fix tests

* address comments

* Fix test

* Update paths without auth const

---------

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
Michael Hansen 2024-09-08 21:22:35 -05:00 committed by GitHub
parent a85ccb94e3
commit 8884465262
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 553 additions and 7 deletions

View File

@ -4,7 +4,7 @@ from __future__ import annotations
from aioesphomeapi import APIClient
from homeassistant.components import zeroconf
from homeassistant.components import ffmpeg, zeroconf
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
@ -15,12 +15,13 @@ from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import CONF_NOISE_PSK, DOMAIN
from .const import CONF_NOISE_PSK, DATA_FFMPEG_PROXY, DOMAIN
from .dashboard import async_setup as async_setup_dashboard
from .domain_data import DomainData
# Import config flow so that it's added to the registry
from .entry_data import ESPHomeConfigEntry, RuntimeEntryData
from .ffmpeg_proxy import FFmpegProxyData, FFmpegProxyView
from .manager import ESPHomeManager, cleanup_instance
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
@ -30,7 +31,12 @@ CLIENT_INFO = f"Home Assistant {ha_version}"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the esphome component."""
proxy_data = hass.data[DATA_FFMPEG_PROXY] = FFmpegProxyData()
await async_setup_dashboard(hass)
hass.http.register_view(
FFmpegProxyView(ffmpeg.get_ffmpeg_manager(hass), proxy_data)
)
return True

View File

@ -18,3 +18,5 @@ PROJECT_URLS = {
"esphome.bluetooth-proxy": "https://esphome.github.io/bluetooth-proxies/",
}
DEFAULT_URL = f"https://esphome.io/changelog/{STABLE_BLE_VERSION_STR}.html"
DATA_FFMPEG_PROXY = f"{DOMAIN}.ffmpeg_proxy"

View File

@ -0,0 +1,227 @@
"""HTTP view that converts audio from a URL to a preferred format."""
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
from http import HTTPStatus
import logging
import secrets
from aiohttp import web
from aiohttp.abc import AbstractStreamWriter, BaseRequest
from homeassistant.components.ffmpeg import FFmpegManager
from homeassistant.components.http import HomeAssistantView
from homeassistant.core import HomeAssistant
from .const import DATA_FFMPEG_PROXY
_LOGGER = logging.getLogger(__name__)
def async_create_proxy_url(
hass: HomeAssistant,
device_id: str,
media_url: str,
media_format: str,
rate: int | None = None,
channels: int | None = None,
) -> str:
"""Create a one-time use proxy URL that automatically converts the media."""
data: FFmpegProxyData = hass.data[DATA_FFMPEG_PROXY]
return data.async_create_proxy_url(
device_id, media_url, media_format, rate, channels
)
@dataclass
class FFmpegConversionInfo:
"""Information for ffmpeg conversion."""
url: str
"""Source URL of media to convert."""
media_format: str
"""Target format for media (mp3, flac, etc.)"""
rate: int | None
"""Target sample rate (None to keep source rate)."""
channels: int | None
"""Target number of channels (None to keep source channels)."""
@dataclass
class FFmpegProxyData:
"""Data for ffmpeg proxy conversion."""
# device_id -> convert_id -> info
conversions: dict[str, dict[str, FFmpegConversionInfo]] = field(
default_factory=lambda: defaultdict(dict)
)
# device_id -> process
processes: dict[str, asyncio.subprocess.Process] = field(default_factory=dict)
def async_create_proxy_url(
self,
device_id: str,
media_url: str,
media_format: str,
rate: int | None,
channels: int | None,
) -> str:
"""Create a one-time use proxy URL that automatically converts the media."""
convert_id = secrets.token_urlsafe(16)
self.conversions[device_id][convert_id] = FFmpegConversionInfo(
media_url, media_format, rate, channels
)
_LOGGER.debug("Media URL allowed by proxy: %s", media_url)
return f"/api/esphome/ffmpeg_proxy/{device_id}/{convert_id}.{media_format}"
class FFmpegConvertResponse(web.StreamResponse):
"""HTTP streaming response that uses ffmpeg to convert audio from a URL."""
def __init__(
self,
manager: FFmpegManager,
convert_info: FFmpegConversionInfo,
device_id: str,
proxy_data: FFmpegProxyData,
chunk_size: int = 2048,
) -> None:
"""Initialize response.
Parameters
----------
manager: FFmpegManager
ffmpeg manager
convert_info: FFmpegConversionInfo
Information necessary to do the conversion
device_id: str
ESPHome device id
proxy_data: FFmpegProxyData
Data object to store ffmpeg process
chunk_size: int
Number of bytes to read from ffmpeg process at a time
"""
super().__init__(status=200)
self.hass = manager.hass
self.manager = manager
self.convert_info = convert_info
self.device_id = device_id
self.proxy_data = proxy_data
self.chunk_size = chunk_size
async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None:
"""Stream url through ffmpeg conversion and out to HTTP client."""
writer = await super().prepare(request)
assert writer is not None
command_args = [
"-i",
self.convert_info.url,
"-f",
self.convert_info.media_format,
]
if self.convert_info.rate is not None:
# Sample rate
command_args.extend(["-ar", str(self.convert_info.rate)])
if self.convert_info.channels is not None:
# Number of channels
command_args.extend(["-ac", str(self.convert_info.channels)])
# Output to stdout
command_args.append("pipe:")
_LOGGER.debug("%s %s", self.manager.binary, " ".join(command_args))
proc = await asyncio.create_subprocess_exec(
self.manager.binary,
*command_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
assert proc.stdout is not None
assert proc.stderr is not None
# Only one conversion process per device is allowed
self.proxy_data.processes[self.device_id] = proc
try:
# Pull audio chunks from ffmpeg and pass them to the HTTP client
while (
self.hass.is_running
and (request.transport is not None)
and (not request.transport.is_closing())
and (proc.returncode is None)
and (chunk := await proc.stdout.read(self.chunk_size))
):
await writer.write(chunk)
await writer.drain()
finally:
# Close connection
await writer.write_eof()
# Terminate hangs, so kill is used
proc.kill()
if proc.returncode != 0:
# Process did not exit successfully
stderr_text = ""
while line := await proc.stderr.readline():
stderr_text += line.decode()
_LOGGER.error("Error shutting down ffmpeg: %s", stderr_text)
else:
_LOGGER.debug("Conversion completed: %s", self.convert_info)
return writer
class FFmpegProxyView(HomeAssistantView):
"""FFmpeg web view to convert audio and stream back to client."""
requires_auth = False
url = "/api/esphome/ffmpeg_proxy/{device_id}/{filename}"
name = "api:esphome:ffmpeg_proxy"
def __init__(self, manager: FFmpegManager, proxy_data: FFmpegProxyData) -> None:
"""Initialize an ffmpeg view."""
self.manager = manager
self.proxy_data = proxy_data
async def get(
self, request: web.Request, device_id: str, filename: str
) -> web.StreamResponse:
"""Start a get request."""
# {id}.mp3 -> id
convert_id = filename.rsplit(".")[0]
try:
convert_info = self.proxy_data.conversions[device_id].pop(convert_id)
except KeyError:
_LOGGER.error(
"Unrecognized convert id %s for device: %s", convert_id, device_id
)
return web.Response(
body="Convert id not recognized", status=HTTPStatus.BAD_REQUEST
)
# Stop any existing process
proc = self.proxy_data.processes.pop(device_id, None)
if (proc is not None) and (proc.returncode is None):
_LOGGER.debug("Stopping existing ffmpeg process for device: %s", device_id)
# Terminate hangs, so kill is used
proc.kill()
# Stream converted audio back to client
return FFmpegConvertResponse(
self.manager, convert_info, device_id, self.proxy_data
)

View File

@ -4,7 +4,7 @@
"after_dependencies": ["zeroconf", "tag"],
"codeowners": ["@OttoWinter", "@jesserockz", "@kbx81", "@bdraco"],
"config_flow": true,
"dependencies": ["assist_pipeline", "bluetooth", "intent"],
"dependencies": ["assist_pipeline", "bluetooth", "intent", "ffmpeg", "http"],
"dhcp": [
{
"registered_devices": true

View File

@ -3,14 +3,18 @@
from __future__ import annotations
from functools import partial
import logging
from typing import Any, cast
from urllib.parse import urlparse
from aioesphomeapi import (
EntityInfo,
MediaPlayerCommand,
MediaPlayerEntityState,
MediaPlayerFormatPurpose,
MediaPlayerInfo,
MediaPlayerState as EspMediaPlayerState,
MediaPlayerSupportedFormat,
)
from homeassistant.components import media_source
@ -34,6 +38,9 @@ from .entity import (
platform_async_setup_entry,
)
from .enum_mapper import EsphomeEnumMapper
from .ffmpeg_proxy import async_create_proxy_url
_LOGGER = logging.getLogger(__name__)
_STATES: EsphomeEnumMapper[EspMediaPlayerState, MediaPlayerState] = EsphomeEnumMapper(
{
@ -66,7 +73,7 @@ class EsphomeMediaPlayer(
if self._static_info.supports_pause:
flags |= MediaPlayerEntityFeature.PAUSE | MediaPlayerEntityFeature.PLAY
self._attr_supported_features = flags
self._entry_data.media_player_formats[self.entity_id] = cast(
self._entry_data.media_player_formats[static_info.unique_id] = cast(
MediaPlayerInfo, static_info
).supported_formats
@ -102,6 +109,22 @@ class EsphomeMediaPlayer(
media_id = async_process_play_media_url(self.hass, media_id)
announcement = kwargs.get(ATTR_MEDIA_ANNOUNCE)
supported_formats: list[MediaPlayerSupportedFormat] | None = (
self._entry_data.media_player_formats.get(self._static_info.unique_id)
)
if (
supported_formats
and _is_url(media_id)
and (
proxy_url := self._get_proxy_url(
supported_formats, media_id, announcement is True
)
)
):
# Substitute proxy URL
media_id = proxy_url
self._client.media_player_command(
self._key, media_url=media_id, announcement=announcement
)
@ -111,6 +134,54 @@ class EsphomeMediaPlayer(
await super().async_will_remove_from_hass()
self._entry_data.media_player_formats.pop(self.entity_id, None)
def _get_proxy_url(
self,
supported_formats: list[MediaPlayerSupportedFormat],
url: str,
announcement: bool,
) -> str | None:
"""Get URL for ffmpeg proxy."""
if self.device_entry is None:
# Device id is required
return None
# Choose the first default or announcement supported format
format_to_use: MediaPlayerSupportedFormat | None = None
for supported_format in supported_formats:
if (format_to_use is None) and (
supported_format.purpose == MediaPlayerFormatPurpose.DEFAULT
):
# First default format
format_to_use = supported_format
elif announcement and (
supported_format.purpose == MediaPlayerFormatPurpose.ANNOUNCEMENT
):
# First announcement format
format_to_use = supported_format
break
if format_to_use is None:
# No format for conversion
return None
# Replace the media URL with a proxy URL pointing to Home
# Assistant. When requested, Home Assistant will use ffmpeg to
# convert the source URL to the supported format.
_LOGGER.debug("Proxying media url %s with format %s", url, format_to_use)
device_id = self.device_entry.id
media_format = format_to_use.format
proxy_url = async_create_proxy_url(
self.hass,
device_id,
url,
media_format=media_format,
rate=format_to_use.sample_rate,
channels=format_to_use.num_channels,
)
# Resolve URL
return async_process_play_media_url(self.hass, proxy_url)
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
@ -152,6 +223,12 @@ class EsphomeMediaPlayer(
)
def _is_url(url: str) -> bool:
"""Validate the URL can be parsed and at least has scheme + netloc."""
result = urlparse(url)
return all([result.scheme, result.netloc])
async_setup_entry = partial(
platform_async_setup_entry,
info_type=MediaPlayerInfo,

View File

@ -23,7 +23,7 @@ from homeassistant.helpers.network import (
from .const import CONTENT_AUTH_EXPIRY_TIME, MediaClass, MediaType
# Paths that we don't need to sign
PATHS_WITHOUT_AUTH = ("/api/tts_proxy/",)
PATHS_WITHOUT_AUTH = ("/api/tts_proxy/", "/api/esphome/ffmpeg_proxy/")
@callback

View File

@ -0,0 +1,111 @@
"""Tests for ffmpeg proxy view."""
from http import HTTPStatus
import io
import tempfile
from unittest.mock import patch
from urllib.request import pathname2url
import wave
import mutagen
from homeassistant.components import esphome
from homeassistant.components.esphome.ffmpeg_proxy import async_create_proxy_url
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.typing import ClientSessionGenerator
async def test_async_create_proxy_url(hass: HomeAssistant) -> None:
"""Test that async_create_proxy_url returns the correct format."""
assert await async_setup_component(hass, "esphome", {})
device_id = "test-device"
convert_id = "test-id"
media_format = "flac"
media_url = "http://127.0.0.1/test.mp3"
proxy_url = f"/api/esphome/ffmpeg_proxy/{device_id}/{convert_id}.{media_format}"
with patch(
"homeassistant.components.esphome.ffmpeg_proxy.secrets.token_urlsafe",
return_value=convert_id,
):
assert (
async_create_proxy_url(hass, device_id, media_url, media_format)
== proxy_url
)
async def test_proxy_view(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test proxy HTTP view for converting audio."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
with tempfile.NamedTemporaryFile(mode="wb+", suffix=".wav") as temp_file:
with wave.open(temp_file.name, "wb") as wav_file:
wav_file.setframerate(16000)
wav_file.setsampwidth(2)
wav_file.setnchannels(1)
wav_file.writeframes(bytes(16000 * 2)) # 1s
temp_file.seek(0)
wav_url = pathname2url(temp_file.name)
convert_id = "test-id"
url = f"/api/esphome/ffmpeg_proxy/{device_id}/{convert_id}.mp3"
# Should fail because we haven't allowed the URL yet
req = await client.get(url)
assert req.status == HTTPStatus.BAD_REQUEST
# Allow the URL
with patch(
"homeassistant.components.esphome.ffmpeg_proxy.secrets.token_urlsafe",
return_value=convert_id,
):
assert (
async_create_proxy_url(
hass, device_id, wav_url, media_format="mp3", rate=22050, channels=2
)
== url
)
req = await client.get(url)
assert req.status == HTTPStatus.OK
mp3_data = await req.content.read()
# Verify conversion
with io.BytesIO(mp3_data) as mp3_io:
mp3_file = mutagen.File(mp3_io)
assert mp3_file.info.sample_rate == 22050
assert mp3_file.info.channels == 2
# About a second, but not exact
assert round(mp3_file.info.length, 0) == 1
async def test_ffmpeg_error(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test proxy HTTP view with an ffmpeg error."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
# Try to convert a file that doesn't exist
url = async_create_proxy_url(hass, device_id, "missing-file", media_format="mp3")
req = await client.get(url)
# The HTTP status is OK because the ffmpeg process started, but no data is
# returned.
assert req.status == HTTPStatus.OK
mp3_data = await req.content.read()
assert not mp3_data

View File

@ -1,13 +1,19 @@
"""Test ESPHome media_players."""
from collections.abc import Awaitable, Callable
from unittest.mock import AsyncMock, Mock, call, patch
from aioesphomeapi import (
APIClient,
EntityInfo,
EntityState,
MediaPlayerCommand,
MediaPlayerEntityState,
MediaPlayerFormatPurpose,
MediaPlayerInfo,
MediaPlayerState,
MediaPlayerSupportedFormat,
UserService,
)
import pytest
@ -31,8 +37,11 @@ from homeassistant.components.media_player import (
)
from homeassistant.const import ATTR_ENTITY_ID
from homeassistant.core import HomeAssistant
import homeassistant.helpers.device_registry as dr
from homeassistant.setup import async_setup_component
from .conftest import MockESPHomeDevice
from tests.common import mock_platform
from tests.typing import WebSocketGenerator
@ -55,7 +64,7 @@ async def test_media_player_entity(
key=1, volume=50, muted=True, state=MediaPlayerState.PAUSED
)
]
user_service = []
user_service: list[UserService] = []
await mock_generic_device_entry(
mock_client=mock_client,
entity_info=entity_info,
@ -200,7 +209,7 @@ async def test_media_player_entity_with_source(
key=1, volume=50, muted=True, state=MediaPlayerState.PLAYING
)
]
user_service = []
user_service: list[UserService] = []
await mock_generic_device_entry(
mock_client=mock_client,
entity_info=entity_info,
@ -277,3 +286,117 @@ async def test_media_player_entity_with_source(
mock_client.media_player_command.assert_has_calls(
[call(1, media_url="media-source://tts?message=hello", announcement=True)]
)
async def test_media_player_proxy(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
mock_client: APIClient,
mock_esphome_device: Callable[
[APIClient, list[EntityInfo], list[UserService], list[EntityState]],
Awaitable[MockESPHomeDevice],
],
) -> None:
"""Test a media_player entity with a proxy URL."""
mock_device: MockESPHomeDevice = await mock_esphome_device(
mock_client=mock_client,
entity_info=[
MediaPlayerInfo(
object_id="mymedia_player",
key=1,
name="my media_player",
unique_id="my_media_player",
supports_pause=True,
supported_formats=[
MediaPlayerSupportedFormat(
format="flac",
sample_rate=48000,
num_channels=2,
purpose=MediaPlayerFormatPurpose.DEFAULT,
),
MediaPlayerSupportedFormat(
format="wav",
sample_rate=16000,
num_channels=1,
purpose=MediaPlayerFormatPurpose.ANNOUNCEMENT,
),
MediaPlayerSupportedFormat(
format="mp3",
sample_rate=48000,
num_channels=2,
purpose=MediaPlayerFormatPurpose.DEFAULT,
),
],
)
],
user_service=[],
states=[
MediaPlayerEntityState(
key=1, volume=50, muted=False, state=MediaPlayerState.PAUSED
)
],
)
await hass.async_block_till_done()
dev = device_registry.async_get_device(
connections={(dr.CONNECTION_NETWORK_MAC, mock_device.entry.unique_id)}
)
assert dev is not None
state = hass.states.get("media_player.test_mymedia_player")
assert state is not None
assert state.state == "paused"
media_url = "http://127.0.0.1/test.mp3"
proxy_url = f"/api/esphome/ffmpeg_proxy/{dev.id}/test-id.flac"
with (
patch(
"homeassistant.components.esphome.media_player.async_create_proxy_url",
return_value=proxy_url,
) as mock_async_create_proxy_url,
):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_mymedia_player",
ATTR_MEDIA_CONTENT_TYPE: MediaType.MUSIC,
ATTR_MEDIA_CONTENT_ID: media_url,
},
blocking=True,
)
# Should be the default format
mock_async_create_proxy_url.assert_called_once()
device_id = mock_async_create_proxy_url.call_args[0][1]
mock_async_create_proxy_url.assert_called_once_with(
hass, device_id, media_url, media_format="flac", rate=48000, channels=2
)
media_args = mock_client.media_player_command.call_args.kwargs
assert not media_args["announcement"]
# Reset
mock_async_create_proxy_url.reset_mock()
# Set announcement flag
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.test_mymedia_player",
ATTR_MEDIA_CONTENT_TYPE: MediaType.MUSIC,
ATTR_MEDIA_CONTENT_ID: media_url,
ATTR_MEDIA_ANNOUNCE: True,
},
blocking=True,
)
# Should be the announcement format
mock_async_create_proxy_url.assert_called_once()
device_id = mock_async_create_proxy_url.call_args[0][1]
mock_async_create_proxy_url.assert_called_once_with(
hass, device_id, media_url, media_format="wav", rate=16000, channels=1
)
media_args = mock_client.media_player_command.call_args.kwargs
assert media_args["announcement"]