diff --git a/homeassistant/components/hassio/websocket_api.py b/homeassistant/components/hassio/websocket_api.py index dfc2b7dc01d..e2ffff5e1e3 100644 --- a/homeassistant/components/hassio/websocket_api.py +++ b/homeassistant/components/hassio/websocket_api.py @@ -1,11 +1,13 @@ """Websocekt API handlers for the hassio integration.""" import logging +import re import voluptuous as vol from homeassistant.components import websocket_api from homeassistant.components.websocket_api.connection import ActiveConnection from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import Unauthorized import homeassistant.helpers.config_validation as cv from homeassistant.helpers.dispatcher import ( async_dispatcher_connect, @@ -34,6 +36,11 @@ SCHEMA_WEBSOCKET_EVENT = vol.Schema( extra=vol.ALLOW_EXTRA, ) +# Endpoints needed for ingress can't require admin because addons can set `panel_admin: false` +WS_NO_ADMIN_ENDPOINTS = re.compile( + r"^(?:" r"|/ingress/(session|validate_session)" r"|/addons/[^/]+/info" r")$" +) + _LOGGER: logging.Logger = logging.getLogger(__package__) @@ -79,7 +86,6 @@ async def websocket_supervisor_event( connection.send_result(msg[WS_ID]) -@websocket_api.require_admin @websocket_api.async_response @websocket_api.websocket_command( { @@ -94,6 +100,10 @@ async def websocket_supervisor_api( hass: HomeAssistant, connection: ActiveConnection, msg: dict ): """Websocket handler to call Supervisor API.""" + if not connection.user.is_admin and not WS_NO_ADMIN_ENDPOINTS.match( + msg[ATTR_ENDPOINT] + ): + raise Unauthorized() supervisor: HassIO = hass.data[DOMAIN] try: result = await supervisor.send_command( diff --git a/tests/components/hassio/test_websocket_api.py b/tests/components/hassio/test_websocket_api.py index 931c1527b78..5d11d13166e 100644 --- a/tests/components/hassio/test_websocket_api.py +++ b/tests/components/hassio/test_websocket_api.py @@ -156,3 +156,69 @@ async def test_websocket_supervisor_api_error( msg = await websocket_client.receive_json() assert msg["error"]["message"] == "example error" + + +async def test_websocket_non_admin_user( + hassio_env, hass: HomeAssistant, hass_ws_client, aioclient_mock, hass_admin_user +): + """Test Supervisor websocket api error.""" + hass_admin_user.groups = [] + assert await async_setup_component(hass, "hassio", {}) + websocket_client = await hass_ws_client(hass) + aioclient_mock.get( + "http://127.0.0.1/addons/test_addon/info", + json={"result": "ok", "data": {}}, + ) + aioclient_mock.get( + "http://127.0.0.1/ingress/session", + json={"result": "ok", "data": {}}, + ) + aioclient_mock.get( + "http://127.0.0.1/ingress/validate_session", + json={"result": "ok", "data": {}}, + ) + + await websocket_client.send_json( + { + WS_ID: 1, + WS_TYPE: WS_TYPE_API, + ATTR_ENDPOINT: "/addons/test_addon/info", + ATTR_METHOD: "get", + } + ) + msg = await websocket_client.receive_json() + assert msg["result"] == {} + + await websocket_client.send_json( + { + WS_ID: 2, + WS_TYPE: WS_TYPE_API, + ATTR_ENDPOINT: "/ingress/session", + ATTR_METHOD: "get", + } + ) + msg = await websocket_client.receive_json() + assert msg["result"] == {} + + await websocket_client.send_json( + { + WS_ID: 3, + WS_TYPE: WS_TYPE_API, + ATTR_ENDPOINT: "/ingress/validate_session", + ATTR_METHOD: "get", + } + ) + msg = await websocket_client.receive_json() + assert msg["result"] == {} + + await websocket_client.send_json( + { + WS_ID: 4, + WS_TYPE: WS_TYPE_API, + ATTR_ENDPOINT: "/supervisor/info", + ATTR_METHOD: "get", + } + ) + + msg = await websocket_client.receive_json() + assert msg["error"]["message"] == "Unauthorized"