Better handle large amounts of data being sent over WS (#23842)

* Better handle large amounts of data being sent over WS

* Lint
This commit is contained in:
Paulus Schoutsen 2019-05-14 05:57:47 +02:00 committed by GitHub
parent b2a1204bc5
commit 45085dd97f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 20 deletions

View File

@ -522,12 +522,10 @@ async def websocket_camera_thumbnail(hass, connection, msg):
""" """
try: try:
image = await async_get_image(hass, msg['entity_id']) image = await async_get_image(hass, msg['entity_id'])
connection.send_message(websocket_api.result_message( await connection.send_big_result(msg['id'], {
msg['id'], { 'content_type': image.content_type,
'content_type': image.content_type, 'content': base64.b64encode(image.content).decode('utf-8')
'content': base64.b64encode(image.content).decode('utf-8') })
}
))
except HomeAssistantError: except HomeAssistantError:
connection.send_message(websocket_api.error_message( connection.send_message(websocket_api.error_message(
msg['id'], 'image_fetch_failed', 'Unable to fetch image')) msg['id'], 'image_fetch_failed', 'Unable to fetch image'))

View File

@ -176,18 +176,19 @@ def handle_yaml_errors(func):
error = None error = None
try: try:
result = await func(hass, connection, msg) result = await func(hass, connection, msg)
message = websocket_api.result_message(
msg['id'], result
)
except ConfigNotFound: except ConfigNotFound:
error = 'config_not_found', 'No config found.' error = 'config_not_found', 'No config found.'
except HomeAssistantError as err: except HomeAssistantError as err:
error = 'error', str(err) error = 'error', str(err)
if error is not None: if error is not None:
message = websocket_api.error_message(msg['id'], *error) connection.send_error(msg['id'], *error)
return
connection.send_message(message) if msg is not None:
await connection.send_big_result(msg['id'], result)
else:
connection.send_result(msg['id'], result)
return send_with_error_handling return send_with_error_handling

View File

@ -869,8 +869,8 @@ async def websocket_handle_thumbnail(hass, connection, msg):
'Failed to fetch thumbnail')) 'Failed to fetch thumbnail'))
return return
connection.send_message(websocket_api.result_message( await connection.send_big_result(
msg['id'], { msg['id'], {
'content_type': content_type, 'content_type': content_type,
'content': base64.b64encode(data).decode('utf-8') 'content': base64.b64encode(data).decode('utf-8')
})) })

View File

@ -36,6 +36,13 @@ class ActiveConnection:
"""Send a result message.""" """Send a result message."""
self.send_message(messages.result_message(msg_id, result)) self.send_message(messages.result_message(msg_id, result))
async def send_big_result(self, msg_id, result):
"""Send a result message that would be expensive to JSON serialize."""
content = await self.hass.async_add_executor_job(
const.JSON_DUMP, messages.result_message(msg_id, result)
)
self.send_message(content)
@callback @callback
def send_error(self, msg_id, code, message): def send_error(self, msg_id, code, message):
"""Send a error message.""" """Send a error message."""

View File

@ -1,6 +1,9 @@
"""Websocket constants.""" """Websocket constants."""
import asyncio import asyncio
from concurrent import futures from concurrent import futures
from functools import partial
import json
from homeassistant.helpers.json import JSONEncoder
DOMAIN = 'websocket_api' DOMAIN = 'websocket_api'
URL = '/api/websocket' URL = '/api/websocket'
@ -27,3 +30,5 @@ SIGNAL_WEBSOCKET_DISCONNECTED = 'websocket_disconnected'
# Data used to store the current connection list # Data used to store the current connection list
DATA_CONNECTIONS = DOMAIN + '.connections' DATA_CONNECTIONS = DOMAIN + '.connections'
JSON_DUMP = partial(json.dumps, cls=JSONEncoder, allow_nan=False)

View File

@ -1,8 +1,6 @@
"""View to accept incoming websocket connection.""" """View to accept incoming websocket connection."""
import asyncio import asyncio
from contextlib import suppress from contextlib import suppress
from functools import partial
import json
import logging import logging
from aiohttp import web, WSMsgType from aiohttp import web, WSMsgType
@ -11,18 +9,15 @@ import async_timeout
from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import callback from homeassistant.core import callback
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
from homeassistant.helpers.json import JSONEncoder
from .const import ( from .const import (
MAX_PENDING_MSG, CANCELLATION_ERRORS, URL, ERR_UNKNOWN_ERROR, MAX_PENDING_MSG, CANCELLATION_ERRORS, URL, ERR_UNKNOWN_ERROR,
SIGNAL_WEBSOCKET_CONNECTED, SIGNAL_WEBSOCKET_DISCONNECTED, SIGNAL_WEBSOCKET_CONNECTED, SIGNAL_WEBSOCKET_DISCONNECTED,
DATA_CONNECTIONS) DATA_CONNECTIONS, JSON_DUMP)
from .auth import AuthPhase, auth_required_message from .auth import AuthPhase, auth_required_message
from .error import Disconnect from .error import Disconnect
from .messages import error_message from .messages import error_message
JSON_DUMP = partial(json.dumps, cls=JSONEncoder, allow_nan=False)
class WebsocketAPIView(HomeAssistantView): class WebsocketAPIView(HomeAssistantView):
"""View to serve a websockets endpoint.""" """View to serve a websockets endpoint."""
@ -62,7 +57,10 @@ class WebSocketHandler:
break break
self._logger.debug("Sending %s", message) self._logger.debug("Sending %s", message)
try: try:
await self.wsock.send_json(message, dumps=JSON_DUMP) if isinstance(message, str):
await self.wsock.send_str(message)
else:
await self.wsock.send_json(message, dumps=JSON_DUMP)
except (ValueError, TypeError) as err: except (ValueError, TypeError) as err:
self._logger.error('Unable to serialize to JSON: %s\n%s', self._logger.error('Unable to serialize to JSON: %s\n%s',
err, message) err, message)

View File

@ -0,0 +1,30 @@
"""Test WebSocket Connection class."""
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import const
async def test_send_big_result(hass, websocket_client):
"""Test sending big results over the WS."""
@websocket_api.websocket_command({
'type': 'big_result'
})
@websocket_api.async_response
async def send_big_result(hass, connection, msg):
await connection.send_big_result(
msg['id'], {'big': 'result'}
)
hass.components.websocket_api.async_register_command(
send_big_result
)
await websocket_client.send_json({
'id': 5,
'type': 'big_result',
})
msg = await websocket_client.receive_json()
assert msg['id'] == 5
assert msg['type'] == const.TYPE_RESULT
assert msg['success']
assert msg['result'] == {'big': 'result'}