From 5a0b25479e53888dcba677957cf5369cb27316d6 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 27 May 2023 18:46:46 -0500 Subject: [PATCH] Use httpx instead of requests for mjpeg camera images that need digest to avoid jump to executor (#93244) * Use httpx instead of requests for mjpeg camera images that need digest Avoids jump to executor * Use httpx instead of requests for mjpeg camera images that need digest Avoids jump to executor * stream as well * fix * fix --- homeassistant/components/mjpeg/__init__.py | 2 +- homeassistant/components/mjpeg/camera.py | 96 ++++++++++++++-------- 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/homeassistant/components/mjpeg/__init__.py b/homeassistant/components/mjpeg/__init__.py index 605c8b6c9d5..27131d9d18f 100644 --- a/homeassistant/components/mjpeg/__init__.py +++ b/homeassistant/components/mjpeg/__init__.py @@ -16,7 +16,7 @@ __all__ = [ ] -def setup(hass: HomeAssistant, config: ConfigType) -> bool: +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: """Set up the MJPEG IP Camera integration.""" filter_urllib3_logging() return True diff --git a/homeassistant/components/mjpeg/camera.py b/homeassistant/components/mjpeg/camera.py index 27f5c7a1411..c2ab3b5768c 100644 --- a/homeassistant/components/mjpeg/camera.py +++ b/homeassistant/components/mjpeg/camera.py @@ -2,14 +2,13 @@ from __future__ import annotations import asyncio -from collections.abc import Iterable -from contextlib import closing +from collections.abc import AsyncIterator +from contextlib import suppress import aiohttp from aiohttp import web import async_timeout -import requests -from requests.auth import HTTPBasicAuth, HTTPDigestAuth +import httpx from yarl import URL from homeassistant.components.camera import Camera @@ -29,9 +28,13 @@ from homeassistant.helpers.aiohttp_client import ( ) from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.httpx_client import get_async_client from .const import CONF_MJPEG_URL, CONF_STILL_IMAGE_URL, DOMAIN, LOGGER +TIMEOUT = 10 +BUFFER_SIZE = 102400 + async def async_setup_entry( hass: HomeAssistant, @@ -59,11 +62,11 @@ async def async_setup_entry( ) -def extract_image_from_mjpeg(stream: Iterable[bytes]) -> bytes | None: +async def async_extract_image_from_mjpeg(stream: AsyncIterator[bytes]) -> bytes | None: """Take in a MJPEG stream object, return the jpg from it.""" data = b"" - for chunk in stream: + async for chunk in stream: data += chunk jpg_end = data.find(b"\xff\xd9") @@ -137,12 +140,11 @@ class MjpegCamera(Camera): self._authentication == HTTP_DIGEST_AUTHENTICATION or self._still_image_url is None ): - image = await self.hass.async_add_executor_job(self.camera_image) - return image + return await self._async_digest_camera_image() websession = async_get_clientsession(self.hass, verify_ssl=self._verify_ssl) try: - async with async_timeout.timeout(10): + async with async_timeout.timeout(TIMEOUT): response = await websession.get(self._still_image_url, auth=self._auth) image = await response.read() @@ -156,37 +158,65 @@ class MjpegCamera(Camera): return None - def camera_image( - self, width: int | None = None, height: int | None = None - ) -> bytes | None: - """Return a still image response from the camera.""" - if self._username and self._password: - if self._authentication == HTTP_DIGEST_AUTHENTICATION: - auth: HTTPDigestAuth | HTTPBasicAuth = HTTPDigestAuth( - self._username, self._password - ) - else: - auth = HTTPBasicAuth(self._username, self._password) - req = requests.get( - self._mjpeg_url, - auth=auth, - stream=True, - timeout=10, - verify=self._verify_ssl, - ) - else: - req = requests.get(self._mjpeg_url, stream=True, timeout=10) + def _get_digest_auth(self) -> httpx.DigestAuth: + """Return a DigestAuth object.""" + username = "" if self._username is None else self._username + return httpx.DigestAuth(username, self._password) - with closing(req) as response: - return extract_image_from_mjpeg(response.iter_content(102400)) + async def _async_digest_camera_image(self) -> bytes | None: + """Return a still image response from the camera using digest authentication.""" + client = get_async_client(self.hass, verify_ssl=self._verify_ssl) + auth = self._get_digest_auth() + try: + if self._still_image_url: + # Fallback to MJPEG stream if still image URL is not available + with suppress(asyncio.TimeoutError, httpx.HTTPError): + return ( + await client.get( + self._still_image_url, auth=auth, timeout=TIMEOUT + ) + ).content + + async with client.stream( + "get", self._mjpeg_url, auth=auth, timeout=TIMEOUT + ) as stream: + return await async_extract_image_from_mjpeg( + stream.aiter_bytes(BUFFER_SIZE) + ) + + except asyncio.TimeoutError: + LOGGER.error("Timeout getting camera image from %s", self.name) + + except httpx.HTTPError as err: + LOGGER.error("Error getting new camera image from %s: %s", self.name, err) + + return None + + async def _handle_async_mjpeg_digest_stream( + self, request: web.Request + ) -> web.StreamResponse | None: + """Generate an HTTP MJPEG stream from the camera using digest authentication.""" + async with get_async_client(self.hass, verify_ssl=self._verify_ssl).stream( + "get", self._mjpeg_url, auth=self._get_digest_auth(), timeout=TIMEOUT + ) as stream: + response = web.StreamResponse(headers=stream.headers) + await response.prepare(request) + # Stream until we are done or client disconnects + with suppress(asyncio.TimeoutError, httpx.HTTPError): + async for chunk in stream.aiter_bytes(BUFFER_SIZE): + if not self.hass.is_running: + break + async with async_timeout.timeout(TIMEOUT): + await response.write(chunk) + return response async def handle_async_mjpeg_stream( self, request: web.Request ) -> web.StreamResponse | None: """Generate an HTTP MJPEG stream from the camera.""" - # aiohttp don't support DigestAuth -> Fallback + # aiohttp don't support DigestAuth so we use httpx if self._authentication == HTTP_DIGEST_AUTHENTICATION: - return await super().handle_async_mjpeg_stream(request) + return await self._handle_async_mjpeg_digest_stream(request) # connect to stream websession = async_get_clientsession(self.hass, verify_ssl=self._verify_ssl)