Fix ingress for non admin (#60120)

This commit is contained in:
Joakim Sørensen 2021-11-29 23:03:16 +01:00 committed by GitHub
parent 914f7f85ec
commit 8a5df5f7eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 77 additions and 1 deletions

View File

@ -1,11 +1,13 @@
"""Websocekt API handlers for the hassio integration.""" """Websocekt API handlers for the hassio integration."""
import logging import logging
import re
import voluptuous as vol import voluptuous as vol
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.components.websocket_api.connection import ActiveConnection from homeassistant.components.websocket_api.connection import ActiveConnection
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import Unauthorized
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import ( from homeassistant.helpers.dispatcher import (
async_dispatcher_connect, async_dispatcher_connect,
@ -34,6 +36,11 @@ SCHEMA_WEBSOCKET_EVENT = vol.Schema(
extra=vol.ALLOW_EXTRA, extra=vol.ALLOW_EXTRA,
) )
# Endpoints needed for ingress can't require admin because addons can set `panel_admin: false`
WS_NO_ADMIN_ENDPOINTS = re.compile(
r"^(?:" r"|/ingress/(session|validate_session)" r"|/addons/[^/]+/info" r")$"
)
_LOGGER: logging.Logger = logging.getLogger(__package__) _LOGGER: logging.Logger = logging.getLogger(__package__)
@ -79,7 +86,6 @@ async def websocket_supervisor_event(
connection.send_result(msg[WS_ID]) connection.send_result(msg[WS_ID])
@websocket_api.require_admin
@websocket_api.async_response @websocket_api.async_response
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
@ -94,6 +100,10 @@ async def websocket_supervisor_api(
hass: HomeAssistant, connection: ActiveConnection, msg: dict hass: HomeAssistant, connection: ActiveConnection, msg: dict
): ):
"""Websocket handler to call Supervisor API.""" """Websocket handler to call Supervisor API."""
if not connection.user.is_admin and not WS_NO_ADMIN_ENDPOINTS.match(
msg[ATTR_ENDPOINT]
):
raise Unauthorized()
supervisor: HassIO = hass.data[DOMAIN] supervisor: HassIO = hass.data[DOMAIN]
try: try:
result = await supervisor.send_command( result = await supervisor.send_command(

View File

@ -156,3 +156,69 @@ async def test_websocket_supervisor_api_error(
msg = await websocket_client.receive_json() msg = await websocket_client.receive_json()
assert msg["error"]["message"] == "example error" assert msg["error"]["message"] == "example error"
async def test_websocket_non_admin_user(
hassio_env, hass: HomeAssistant, hass_ws_client, aioclient_mock, hass_admin_user
):
"""Test Supervisor websocket api error."""
hass_admin_user.groups = []
assert await async_setup_component(hass, "hassio", {})
websocket_client = await hass_ws_client(hass)
aioclient_mock.get(
"http://127.0.0.1/addons/test_addon/info",
json={"result": "ok", "data": {}},
)
aioclient_mock.get(
"http://127.0.0.1/ingress/session",
json={"result": "ok", "data": {}},
)
aioclient_mock.get(
"http://127.0.0.1/ingress/validate_session",
json={"result": "ok", "data": {}},
)
await websocket_client.send_json(
{
WS_ID: 1,
WS_TYPE: WS_TYPE_API,
ATTR_ENDPOINT: "/addons/test_addon/info",
ATTR_METHOD: "get",
}
)
msg = await websocket_client.receive_json()
assert msg["result"] == {}
await websocket_client.send_json(
{
WS_ID: 2,
WS_TYPE: WS_TYPE_API,
ATTR_ENDPOINT: "/ingress/session",
ATTR_METHOD: "get",
}
)
msg = await websocket_client.receive_json()
assert msg["result"] == {}
await websocket_client.send_json(
{
WS_ID: 3,
WS_TYPE: WS_TYPE_API,
ATTR_ENDPOINT: "/ingress/validate_session",
ATTR_METHOD: "get",
}
)
msg = await websocket_client.receive_json()
assert msg["result"] == {}
await websocket_client.send_json(
{
WS_ID: 4,
WS_TYPE: WS_TYPE_API,
ATTR_ENDPOINT: "/supervisor/info",
ATTR_METHOD: "get",
}
)
msg = await websocket_client.receive_json()
assert msg["error"]["message"] == "Unauthorized"