Fix aiohttp connection reset errors (#15577)

* Fix aiohttp connection reset errors

* Update aiohttp_client.py

* Update aiohttp_client.py

* Update __init__.py

* Update mjpeg.py

* Update mjpeg.py

* Update ffmpeg.py

* Update ffmpeg.py

* Update ffmpeg.py

* Update proxy.py

* Update __init__.py

* Update aiohttp_client.py

* Update aiohttp_client.py

* Update proxy.py

* Update proxy.py

* Fix await inside coroutine

* Fix async syntax

* Lint
This commit is contained in:
Pascal Vizeli 2018-07-23 14:36:36 +02:00 committed by Paulus Schoutsen
parent 8213b1476f
commit f3dfc433c2
5 changed files with 46 additions and 69 deletions

View File

@ -301,32 +301,23 @@ class Camera(Entity):
last_image = None last_image = None
try: while True:
while True: img_bytes = await self.async_camera_image()
img_bytes = await self.async_camera_image() if not img_bytes:
if not img_bytes: break
break
if img_bytes and img_bytes != last_image: if img_bytes and img_bytes != last_image:
await write_to_mjpeg_stream(img_bytes)
# Chrome seems to always ignore first picture,
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes) await write_to_mjpeg_stream(img_bytes)
last_image = img_bytes
# Chrome seems to always ignore first picture, await asyncio.sleep(interval)
# print it twice.
if last_image is None:
await write_to_mjpeg_stream(img_bytes)
last_image = img_bytes return response
await asyncio.sleep(interval)
except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
response = None
raise
finally:
if response is not None:
await response.write_eof()
async def handle_async_mjpeg_stream(self, request): async def handle_async_mjpeg_stream(self, request):
"""Serve an HTTP MJPEG stream from the camera. """Serve an HTTP MJPEG stream from the camera.
@ -409,10 +400,9 @@ class CameraView(HomeAssistantView):
request.query.get('token') in camera.access_tokens) request.query.get('token') in camera.access_tokens)
if not authenticated: if not authenticated:
return web.Response(status=401) raise web.HTTPUnauthorized()
response = await self.handle(request, camera) return await self.handle(request, camera)
return response
async def handle(self, request, camera): async def handle(self, request, camera):
"""Handle the camera request.""" """Handle the camera request."""
@ -435,7 +425,7 @@ class CameraImageView(CameraView):
return web.Response(body=image, return web.Response(body=image,
content_type=camera.content_type) content_type=camera.content_type)
return web.Response(status=500) raise web.HTTPInternalServerError()
class CameraMjpegStream(CameraView): class CameraMjpegStream(CameraView):
@ -448,8 +438,7 @@ class CameraMjpegStream(CameraView):
"""Serve camera stream, possibly with interval.""" """Serve camera stream, possibly with interval."""
interval = request.query.get('interval') interval = request.query.get('interval')
if interval is None: if interval is None:
await camera.handle_async_mjpeg_stream(request) return await camera.handle_async_mjpeg_stream(request)
return
try: try:
# Compose camera stream from stills # Compose camera stream from stills
@ -457,10 +446,9 @@ class CameraMjpegStream(CameraView):
if interval < MIN_STREAM_INTERVAL: if interval < MIN_STREAM_INTERVAL:
raise ValueError("Stream interval must be be > {}" raise ValueError("Stream interval must be be > {}"
.format(MIN_STREAM_INTERVAL)) .format(MIN_STREAM_INTERVAL))
await camera.handle_async_still_stream(request, interval) return await camera.handle_async_still_stream(request, interval)
return
except ValueError: except ValueError:
return web.Response(status=400) raise web.HTTPBadRequest()
@callback @callback

View File

@ -29,8 +29,8 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
}) })
@asyncio.coroutine async def async_setup_platform(hass, config, async_add_devices,
def async_setup_platform(hass, config, async_add_devices, discovery_info=None): discovery_info=None):
"""Set up a FFmpeg camera.""" """Set up a FFmpeg camera."""
if not hass.data[DATA_FFMPEG].async_run_test(config.get(CONF_INPUT)): if not hass.data[DATA_FFMPEG].async_run_test(config.get(CONF_INPUT)):
return return
@ -49,30 +49,30 @@ class FFmpegCamera(Camera):
self._input = config.get(CONF_INPUT) self._input = config.get(CONF_INPUT)
self._extra_arguments = config.get(CONF_EXTRA_ARGUMENTS) self._extra_arguments = config.get(CONF_EXTRA_ARGUMENTS)
@asyncio.coroutine async def async_camera_image(self):
def async_camera_image(self):
"""Return a still image response from the camera.""" """Return a still image response from the camera."""
from haffmpeg import ImageFrame, IMAGE_JPEG from haffmpeg import ImageFrame, IMAGE_JPEG
ffmpeg = ImageFrame(self._manager.binary, loop=self.hass.loop) ffmpeg = ImageFrame(self._manager.binary, loop=self.hass.loop)
image = yield from asyncio.shield(ffmpeg.get_image( image = await asyncio.shield(ffmpeg.get_image(
self._input, output_format=IMAGE_JPEG, self._input, output_format=IMAGE_JPEG,
extra_cmd=self._extra_arguments), loop=self.hass.loop) extra_cmd=self._extra_arguments), loop=self.hass.loop)
return image return image
@asyncio.coroutine async def handle_async_mjpeg_stream(self, request):
def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera.""" """Generate an HTTP MJPEG stream from the camera."""
from haffmpeg import CameraMjpeg from haffmpeg import CameraMjpeg
stream = CameraMjpeg(self._manager.binary, loop=self.hass.loop) stream = CameraMjpeg(self._manager.binary, loop=self.hass.loop)
yield from stream.open_camera( await stream.open_camera(
self._input, extra_cmd=self._extra_arguments) self._input, extra_cmd=self._extra_arguments)
yield from async_aiohttp_proxy_stream( try:
self.hass, request, stream, return await async_aiohttp_proxy_stream(
'multipart/x-mixed-replace;boundary=ffserver') self.hass, request, stream,
yield from stream.close() 'multipart/x-mixed-replace;boundary=ffserver')
finally:
await stream.close()
@property @property
def name(self): def name(self):

View File

@ -123,19 +123,18 @@ class MjpegCamera(Camera):
with closing(req) as response: with closing(req) as response:
return extract_image_from_mjpeg(response.iter_content(102400)) return extract_image_from_mjpeg(response.iter_content(102400))
@asyncio.coroutine async def handle_async_mjpeg_stream(self, request):
def handle_async_mjpeg_stream(self, request):
"""Generate an HTTP MJPEG stream from the camera.""" """Generate an HTTP MJPEG stream from the camera."""
# aiohttp don't support DigestAuth -> Fallback # aiohttp don't support DigestAuth -> Fallback
if self._authentication == HTTP_DIGEST_AUTHENTICATION: if self._authentication == HTTP_DIGEST_AUTHENTICATION:
yield from super().handle_async_mjpeg_stream(request) await super().handle_async_mjpeg_stream(request)
return return
# connect to stream # connect to stream
websession = async_get_clientsession(self.hass) websession = async_get_clientsession(self.hass)
stream_coro = websession.get(self._mjpeg_url, auth=self._auth) stream_coro = websession.get(self._mjpeg_url, auth=self._auth)
yield from async_aiohttp_proxy_web(self.hass, request, stream_coro) return await async_aiohttp_proxy_web(self.hass, request, stream_coro)
@property @property
def name(self): def name(self):

View File

@ -191,8 +191,8 @@ class ProxyCamera(Camera):
stream_coro = websession.get(url, headers=self._headers) stream_coro = websession.get(url, headers=self._headers)
if not self._stream_opts: if not self._stream_opts:
await async_aiohttp_proxy_web(self.hass, request, stream_coro) return await async_aiohttp_proxy_web(
return self.hass, request, stream_coro)
response = aiohttp.web.StreamResponse() response = aiohttp.web.StreamResponse()
response.content_type = ('multipart/x-mixed-replace; ' response.content_type = ('multipart/x-mixed-replace; '
@ -229,15 +229,10 @@ class ProxyCamera(Camera):
_resize_image, image, self._stream_opts) _resize_image, image, self._stream_opts)
await write(image) await write(image)
data = data[jpg_end + 2:] data = data[jpg_end + 2:]
except asyncio.CancelledError:
_LOGGER.debug("Stream closed by frontend.")
req.close()
response = None
raise
finally: finally:
if response is not None: req.close()
await response.write_eof()
return response
@property @property
def name(self): def name(self):

View File

@ -66,14 +66,13 @@ def async_create_clientsession(hass, verify_ssl=True, auto_cleanup=True,
return clientsession return clientsession
@asyncio.coroutine
@bind_hass @bind_hass
def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400, async def async_aiohttp_proxy_web(hass, request, web_coro,
timeout=10): buffer_size=102400, timeout=10):
"""Stream websession request to aiohttp web response.""" """Stream websession request to aiohttp web response."""
try: try:
with async_timeout.timeout(timeout, loop=hass.loop): with async_timeout.timeout(timeout, loop=hass.loop):
req = yield from web_coro req = await web_coro
except asyncio.CancelledError: except asyncio.CancelledError:
# The user cancelled the request # The user cancelled the request
@ -88,7 +87,7 @@ def async_aiohttp_proxy_web(hass, request, web_coro, buffer_size=102400,
raise HTTPBadGateway() from err raise HTTPBadGateway() from err
try: try:
yield from async_aiohttp_proxy_stream( return await async_aiohttp_proxy_stream(
hass, hass,
request, request,
req.content, req.content,
@ -112,19 +111,15 @@ async def async_aiohttp_proxy_stream(hass, request, stream, content_type,
data = await stream.read(buffer_size) data = await stream.read(buffer_size)
if not data: if not data:
await response.write_eof()
break break
await response.write(data) await response.write(data)
except (asyncio.TimeoutError, aiohttp.ClientError): except (asyncio.TimeoutError, aiohttp.ClientError):
# Something went wrong fetching data, close connection gracefully # Something went wrong fetching data, closed connection
await response.write_eof()
except asyncio.CancelledError:
# The user closed the connection
pass pass
return response
@callback @callback
def _async_register_clientsession_shutdown(hass, clientsession): def _async_register_clientsession_shutdown(hass, clientsession):