diff --git a/supervisor/addons/const.py b/supervisor/addons/const.py index b0fd3aa29..88fccbc18 100644 --- a/supervisor/addons/const.py +++ b/supervisor/addons/const.py @@ -10,3 +10,4 @@ class AddonBackupMode(str, Enum): ATTR_BACKUP = "backup" +ATTR_CODENOTARY = "codenotary" diff --git a/supervisor/addons/model.py b/supervisor/addons/model.py index 103aa8348..19d87fd21 100644 --- a/supervisor/addons/model.py +++ b/supervisor/addons/model.py @@ -79,7 +79,7 @@ from ..const import ( ) from ..coresys import CoreSys, CoreSysAttributes from ..docker.const import Capabilities -from .const import ATTR_BACKUP +from .const import ATTR_BACKUP, ATTR_CODENOTARY from .options import AddonOptions, UiOptions from .validate import RE_SERVICE, RE_VOLUME @@ -578,6 +578,16 @@ class AddonModel(CoreSysAttributes, ABC): """Return True if the add-on accesses the system journal.""" return self.data[ATTR_JOURNALD] + @property + def signed(self) -> bool: + """Return True if the image is signed.""" + return ATTR_CODENOTARY in self.data + + @property + def codenotary(self) -> Optional[str]: + """Return Signer email address for CAS.""" + return self.data.get(ATTR_CODENOTARY) + def __eq__(self, other): """Compaired add-on objects.""" if not isinstance(other, AddonModel): diff --git a/supervisor/addons/utils.py b/supervisor/addons/utils.py index fecf8062f..0351ffda2 100644 --- a/supervisor/addons/utils.py +++ b/supervisor/addons/utils.py @@ -16,10 +16,10 @@ _LOGGER: logging.Logger = logging.getLogger(__name__) def rating_security(addon: AddonModel) -> int: - """Return 1-6 for security rating. + """Return 1-8 for security rating. 1 = not secure - 6 = high secure + 8 = high secure """ rating = 5 @@ -35,6 +35,10 @@ def rating_security(addon: AddonModel) -> int: elif addon.access_auth_api: rating += 1 + # Signed + if addon.signed: + rating += 1 + # Privileged options if ( any( @@ -70,7 +74,7 @@ def rating_security(addon: AddonModel) -> int: if addon.access_docker_api or addon.with_full_access: rating = 1 - return max(min(6, rating), 1) + return max(min(8, rating), 1) async def remove_data(folder: Path) -> None: diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index 6f4cf016e..5320cf2f0 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -110,7 +110,7 @@ from ..validate import ( uuid_match, version_tag, ) -from .const import ATTR_BACKUP +from .const import ATTR_BACKUP, ATTR_CODENOTARY from .options import RE_SCHEMA_ELEMENT _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -317,6 +317,7 @@ _SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce( AddonBackupMode ), + vol.Optional(ATTR_CODENOTARY): vol.Email(), vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_SCHEMA, default={}): vol.Any( vol.Schema( diff --git a/supervisor/api/addons.py b/supervisor/api/addons.py index bfc06d2a0..89f4fd021 100644 --- a/supervisor/api/addons.py +++ b/supervisor/api/addons.py @@ -106,6 +106,7 @@ from ..coresys import CoreSysAttributes from ..docker.stats import DockerStats from ..exceptions import APIError, APIForbidden, PwnedError, PwnedSecret from ..validate import docker_ports +from .const import ATTR_SIGNED from .utils import api_process, api_process_raw, api_validate, json_loads _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -269,6 +270,7 @@ class APIAddons(CoreSysAttributes): ATTR_IP_ADDRESS: None, ATTR_TRANSLATIONS: addon.translations, ATTR_INGRESS: addon.with_ingress, + ATTR_SIGNED: addon.signed, ATTR_INGRESS_ENTRY: None, ATTR_INGRESS_URL: None, ATTR_INGRESS_PORT: None, diff --git a/supervisor/api/const.py b/supervisor/api/const.py index 90c52f7d3..215258879 100644 --- a/supervisor/api/const.py +++ b/supervisor/api/const.py @@ -13,3 +13,4 @@ ATTR_APPARMOR_VERSION = "apparmor_version" ATTR_PANEL_PATH = "panel_path" ATTR_UPDATE_TYPE = "update_type" ATTR_AVAILABLE_UPDATES = "available_updates" +ATTR_SIGNED = "signed" diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 2e421b12c..6e03ea468 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -1,6 +1,7 @@ """Init file for Supervisor add-on Docker object.""" from __future__ import annotations +import asyncio from contextlib import suppress from ipaddress import IPv4Address, ip_address import logging @@ -673,6 +674,15 @@ class DockerAddon(DockerInterface): self, image_id: str, image: str, version: AwesomeVersion ) -> None: """Validate trust of content.""" + if not self.addon.signed: + return + + checksum = image_id.partition(":")[2] + job = asyncio.run_coroutine_threadsafe( + self.sys_security.verify_content(self.addon.codenotary, checksum), + self.sys_loop, + ) + job.result(timeout=20) @Job(conditions=[JobCondition.OS_AGENT]) async def _hardware_events(self, device: Device) -> None: diff --git a/supervisor/security.py b/supervisor/security.py index f4e7d14a0..b71914d36 100644 --- a/supervisor/security.py +++ b/supervisor/security.py @@ -56,14 +56,14 @@ class Security(FileConfiguration, CoreSysAttributes): """Set pwned is enabled/disabled.""" self._data[ATTR_PWNED] = value - async def verify_own_content(self, checksum: str) -> Awaitable[None]: - """Verify content from HA org.""" + async def verify_content(self, signer: str, checksum: str) -> None: + """Verify content on CAS.""" if not self.content_trust: _LOGGER.warning("Disabled content-trust, skip validation") return try: - await cas_validate(checksum=checksum, signer="notary@home-assistant.io") + await cas_validate(signer, checksum) except CodeNotaryUntrusted: raise except CodeNotaryError: @@ -71,6 +71,10 @@ class Security(FileConfiguration, CoreSysAttributes): raise return + def verify_own_content(self, checksum: str) -> Awaitable[None]: + """Verify content from HA org.""" + return self.verify_content("notary@home-assistant.io", checksum) + async def verify_secret(self, pwned_hash: str) -> None: """Verify pwned state of a secret.""" if not self.pwned: diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 000000000..39ecbc536 --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,55 @@ +"""Testing handling with Security.""" +from unittest.mock import AsyncMock, patch + +import pytest + +from supervisor.coresys import CoreSys +from supervisor.exceptions import CodeNotaryError + + +async def test_content_trust(coresys: CoreSys): + """Test Content-Trust.""" + + with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") + + with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_own_content("ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with( + "notary@home-assistant.io", "ffffffffffffff" + ) + + +async def test_disabled_content_trust(coresys: CoreSys): + """Test Content-Trust.""" + coresys.security.content_trust = False + + with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert not cas_validate.called + + with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate: + await coresys.security.verify_own_content("ffffffffffffff") + assert not cas_validate.called + + +async def test_force_content_trust(coresys: CoreSys): + """Force Content-Trust tests.""" + + with patch( + "supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError) + ) as cas_validate: + await coresys.security.verify_content("test@mail.com", "ffffffffffffff") + assert cas_validate.called + cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") + + coresys.security.force = True + + with patch( + "supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError) + ) as cas_validate: + with pytest.raises(CodeNotaryError): + await coresys.security.verify_content("test@mail.com", "ffffffffffffff")