Allow core to mark addons as system managed (#5145)

* Allow core to mark addons as system managed

* System managed options only settable by Home Assistant
This commit is contained in:
Mike Degatano 2024-08-13 09:14:42 -04:00 committed by GitHub
parent 5d6738ced8
commit eb3986bea2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 159 additions and 7 deletions

View File

@ -46,6 +46,8 @@ from ..const import (
ATTR_SLUG,
ATTR_STATE,
ATTR_SYSTEM,
ATTR_SYSTEM_MANAGED,
ATTR_SYSTEM_MANAGED_CONFIG_ENTRY,
ATTR_TYPE,
ATTR_USER,
ATTR_UUID,
@ -363,6 +365,37 @@ class Addon(AddonModel):
else:
self.persist[ATTR_WATCHDOG] = value
@property
def system_managed(self) -> bool:
"""Return True if addon is managed by Home Assistant."""
return self.persist[ATTR_SYSTEM_MANAGED]
@system_managed.setter
def system_managed(self, value: bool) -> None:
"""Set system managed enable/disable."""
if not value and self.system_managed_config_entry:
self.system_managed_config_entry = None
self.persist[ATTR_SYSTEM_MANAGED] = value
@property
def system_managed_config_entry(self) -> str | None:
"""Return id of config entry managing this addon (if any)."""
if not self.system_managed:
return None
return self.persist.get(ATTR_SYSTEM_MANAGED_CONFIG_ENTRY)
@system_managed_config_entry.setter
def system_managed_config_entry(self, value: str | None) -> None:
"""Set ID of config entry managing this addon."""
if not self.system_managed:
_LOGGER.warning(
"Ignoring system managed config entry for %s because it is not system managed",
self.slug,
)
else:
self.persist[ATTR_SYSTEM_MANAGED_CONFIG_ENTRY] = value
@property
def uuid(self) -> str:
"""Return an API token for this add-on."""

View File

@ -78,6 +78,8 @@ from ..const import (
ATTR_STATE,
ATTR_STDIN,
ATTR_SYSTEM,
ATTR_SYSTEM_MANAGED,
ATTR_SYSTEM_MANAGED_CONFIG_ENTRY,
ATTR_TIMEOUT,
ATTR_TMPFS,
ATTR_TRANSLATIONS,
@ -467,6 +469,8 @@ SCHEMA_ADDON_USER = vol.Schema(
vol.Optional(ATTR_PROTECTED, default=True): vol.Boolean(),
vol.Optional(ATTR_INGRESS_PANEL, default=False): vol.Boolean(),
vol.Optional(ATTR_WATCHDOG, default=False): vol.Boolean(),
vol.Optional(ATTR_SYSTEM_MANAGED, default=False): vol.Boolean(),
vol.Optional(ATTR_SYSTEM_MANAGED_CONFIG_ENTRY, default=None): vol.Maybe(str),
},
extra=vol.REMOVE_EXTRA,
)

View File

@ -1,4 +1,5 @@
"""Init file for Supervisor RESTful API."""
from functools import partial
import logging
from pathlib import Path
@ -508,6 +509,7 @@ class RestAPI(CoreSysAttributes):
web.post("/addons/{addon}/stop", api_addons.stop),
web.post("/addons/{addon}/restart", api_addons.restart),
web.post("/addons/{addon}/options", api_addons.options),
web.post("/addons/{addon}/sys_options", api_addons.sys_options),
web.post(
"/addons/{addon}/options/validate", api_addons.options_validate
),

View File

@ -1,4 +1,5 @@
"""Init file for Supervisor Home Assistant RESTful API."""
import asyncio
from collections.abc import Awaitable
import logging
@ -81,6 +82,8 @@ from ..const import (
ATTR_STARTUP,
ATTR_STATE,
ATTR_STDIN,
ATTR_SYSTEM_MANAGED,
ATTR_SYSTEM_MANAGED_CONFIG_ENTRY,
ATTR_TRANSLATIONS,
ATTR_UART,
ATTR_UDEV,
@ -126,6 +129,13 @@ SCHEMA_OPTIONS = vol.Schema(
}
)
SCHEMA_SYS_OPTIONS = vol.Schema(
{
vol.Optional(ATTR_SYSTEM_MANAGED): vol.Boolean(),
vol.Optional(ATTR_SYSTEM_MANAGED_CONFIG_ENTRY): vol.Maybe(str),
}
)
SCHEMA_SECURITY = vol.Schema({vol.Optional(ATTR_PROTECTED): vol.Boolean()})
SCHEMA_UNINSTALL = vol.Schema(
@ -178,6 +188,7 @@ class APIAddons(CoreSysAttributes):
ATTR_URL: addon.url,
ATTR_ICON: addon.with_icon,
ATTR_LOGO: addon.with_logo,
ATTR_SYSTEM_MANAGED: addon.system_managed,
}
for addon in self.sys_addons.installed
]
@ -265,6 +276,8 @@ class APIAddons(CoreSysAttributes):
ATTR_WATCHDOG: addon.watchdog,
ATTR_DEVICES: addon.static_devices
+ [device.path for device in addon.devices],
ATTR_SYSTEM_MANAGED: addon.system_managed,
ATTR_SYSTEM_MANAGED_CONFIG_ENTRY: addon.system_managed_config_entry,
}
return data
@ -304,6 +317,20 @@ class APIAddons(CoreSysAttributes):
addon.save_persist()
@api_process
async def sys_options(self, request: web.Request) -> None:
"""Store system options for an add-on."""
addon = self.get_addon_for_request(request)
# Validate/Process Body
body = await api_validate(SCHEMA_SYS_OPTIONS, request)
if ATTR_SYSTEM_MANAGED in body:
addon.system_managed = body[ATTR_SYSTEM_MANAGED]
if ATTR_SYSTEM_MANAGED_CONFIG_ENTRY in body:
addon.system_managed_config_entry = body[ATTR_SYSTEM_MANAGED_CONFIG_ENTRY]
addon.save_persist()
@api_process
async def options_validate(self, request: web.Request) -> None:
"""Validate user options for add-on."""

View File

@ -1,4 +1,5 @@
"""Handle security part of this API."""
import logging
import re
from typing import Final
@ -77,6 +78,13 @@ ADDONS_API_BYPASS: Final = re.compile(
r")$"
)
# Home Assistant only
CORE_ONLY_PATHS: Final = re.compile(
r"^(?:"
r"/addons/" + RE_SLUG + "/sys_options"
r")$"
)
# Policy role add-on API access
ADDONS_ROLE_ACCESS: dict[str, re.Pattern] = {
ROLE_DEFAULT: re.compile(
@ -232,6 +240,9 @@ class SecurityMiddleware(CoreSysAttributes):
if supervisor_token == self.sys_homeassistant.supervisor_token:
_LOGGER.debug("%s access from Home Assistant", request.path)
request_from = self.sys_homeassistant
elif CORE_ONLY_PATHS.match(request.path):
_LOGGER.warning("Attempted access to %s from client besides Home Assistant")
raise HTTPForbidden()
# Host
if supervisor_token == self.sys_plugins.cli.supervisor_token:

View File

@ -309,6 +309,8 @@ ATTR_SUPERVISOR_VERSION = "supervisor_version"
ATTR_SUPPORTED = "supported"
ATTR_SUPPORTED_ARCH = "supported_arch"
ATTR_SYSTEM = "system"
ATTR_SYSTEM_MANAGED = "system_managed"
ATTR_SYSTEM_MANAGED_CONFIG_ENTRY = "system_managed_config_entry"
ATTR_TIMEOUT = "timeout"
ATTR_TIMEZONE = "timezone"
ATTR_TITLE = "title"

View File

@ -1,4 +1,5 @@
"""Test API security layer."""
import asyncio
from http import HTTPStatus
from unittest.mock import patch
@ -180,6 +181,8 @@ async def test_bad_requests(
("post", "/addons/abc123/restart", {"admin", "manager"}),
("post", "/addons/abc123/security", {"admin"}),
("post", "/os/datadisk/wipe", {"admin"}),
("post", "/addons/self/sys_options", set()),
("post", "/addons/abc123/sys_options", set()),
],
)
async def test_token_validation(
@ -205,3 +208,12 @@ async def test_token_validation(
request_path, headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 403
async def test_home_assistant_paths(api_token_validation: TestClient, coresys: CoreSys):
"""Test Home Assistant only paths."""
coresys.homeassistant.supervisor_token = "abc123"
resp = await api_token_validation.post(
"/addons/local_test/sys_options", headers={"Authorization": "Bearer abc123"}
)
assert resp.status == 200

View File

@ -4,6 +4,7 @@ import asyncio
from unittest.mock import MagicMock, PropertyMock, patch
from aiohttp.test_utils import TestClient
import pytest
from supervisor.addons.addon import Addon
from supervisor.addons.build import AddonBuild
@ -231,13 +232,13 @@ async def test_api_addon_rebuild_healthcheck(
nonlocal _container_events_task
_container_events_task = asyncio.create_task(container_events())
with patch.object(
AddonBuild, "is_valid", new=PropertyMock(return_value=True)
), patch.object(DockerAddon, "is_running", return_value=False), patch.object(
Addon, "need_build", new=PropertyMock(return_value=True)
), patch.object(
CpuArch, "supported", new=PropertyMock(return_value=["amd64"])
), patch.object(DockerAddon, "run", new=container_events_task):
with (
patch.object(AddonBuild, "is_valid", new=PropertyMock(return_value=True)),
patch.object(DockerAddon, "is_running", return_value=False),
patch.object(Addon, "need_build", new=PropertyMock(return_value=True)),
patch.object(CpuArch, "supported", new=PropertyMock(return_value=["amd64"])),
patch.object(DockerAddon, "run", new=container_events_task),
):
resp = await api_client.post("/addons/local_ssh/rebuild")
assert state_changes == [AddonState.STOPPED, AddonState.STARTUP]
@ -285,3 +286,63 @@ async def test_api_addon_uninstall_remove_config(
assert resp.status == 200
assert not coresys.addons.get("local_example", local_only=True)
assert not test_folder.exists()
async def test_api_addon_system_managed(
api_client: TestClient,
coresys: CoreSys,
install_addon_example: Addon,
caplog: pytest.LogCaptureFixture,
tmp_supervisor_data,
path_extern,
):
"""Test setting system managed for an addon."""
install_addon_example.data["ingress"] = False
# Not system managed
resp = await api_client.get("/addons")
body = await resp.json()
assert body["data"]["addons"][0]["slug"] == "local_example"
assert body["data"]["addons"][0]["system_managed"] is False
resp = await api_client.get("/addons/local_example/info")
body = await resp.json()
assert body["data"]["system_managed"] is False
assert body["data"]["system_managed_config_entry"] is None
# Mark as system managed
coresys.addons.data.save_data.reset_mock()
resp = await api_client.post(
"/addons/local_example/sys_options",
json={"system_managed": True, "system_managed_config_entry": "abc123"},
)
assert resp.status == 200
coresys.addons.data.save_data.assert_called_once()
resp = await api_client.get("/addons")
body = await resp.json()
assert body["data"]["addons"][0]["system_managed"] is True
resp = await api_client.get("/addons/local_example/info")
body = await resp.json()
assert body["data"]["system_managed"] is True
assert body["data"]["system_managed_config_entry"] == "abc123"
# Revert. Log that cannot have a config entry if not system managed
coresys.addons.data.save_data.reset_mock()
resp = await api_client.post(
"/addons/local_example/sys_options",
json={"system_managed": False, "system_managed_config_entry": "abc123"},
)
assert resp.status == 200
coresys.addons.data.save_data.assert_called_once()
assert "Ignoring system managed config entry" in caplog.text
resp = await api_client.get("/addons")
body = await resp.json()
assert body["data"]["addons"][0]["system_managed"] is False
resp = await api_client.get("/addons/local_example/info")
body = await resp.json()
assert body["data"]["system_managed"] is False
assert body["data"]["system_managed_config_entry"] is None