mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-15 05:06:30 +00:00
Add add-on support for CAS (#3450)
* Add add-on support for CAS * add tests * Update tests/test_security.py Co-authored-by: Mike Degatano <michael.degatano@gmail.com> * address comments * Address rating comments Co-authored-by: Mike Degatano <michael.degatano@gmail.com>
This commit is contained in:
parent
b4cfbe46c1
commit
a2f43d8c7b
@ -10,3 +10,4 @@ class AddonBackupMode(str, Enum):
|
||||
|
||||
|
||||
ATTR_BACKUP = "backup"
|
||||
ATTR_CODENOTARY = "codenotary"
|
||||
|
@ -79,7 +79,7 @@ from ..const import (
|
||||
)
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..docker.const import Capabilities
|
||||
from .const import ATTR_BACKUP
|
||||
from .const import ATTR_BACKUP, ATTR_CODENOTARY
|
||||
from .options import AddonOptions, UiOptions
|
||||
from .validate import RE_SERVICE, RE_VOLUME
|
||||
|
||||
@ -578,6 +578,16 @@ class AddonModel(CoreSysAttributes, ABC):
|
||||
"""Return True if the add-on accesses the system journal."""
|
||||
return self.data[ATTR_JOURNALD]
|
||||
|
||||
@property
|
||||
def signed(self) -> bool:
|
||||
"""Return True if the image is signed."""
|
||||
return ATTR_CODENOTARY in self.data
|
||||
|
||||
@property
|
||||
def codenotary(self) -> Optional[str]:
|
||||
"""Return Signer email address for CAS."""
|
||||
return self.data.get(ATTR_CODENOTARY)
|
||||
|
||||
def __eq__(self, other):
|
||||
"""Compaired add-on objects."""
|
||||
if not isinstance(other, AddonModel):
|
||||
|
@ -16,10 +16,10 @@ _LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def rating_security(addon: AddonModel) -> int:
|
||||
"""Return 1-6 for security rating.
|
||||
"""Return 1-8 for security rating.
|
||||
|
||||
1 = not secure
|
||||
6 = high secure
|
||||
8 = high secure
|
||||
"""
|
||||
rating = 5
|
||||
|
||||
@ -35,6 +35,10 @@ def rating_security(addon: AddonModel) -> int:
|
||||
elif addon.access_auth_api:
|
||||
rating += 1
|
||||
|
||||
# Signed
|
||||
if addon.signed:
|
||||
rating += 1
|
||||
|
||||
# Privileged options
|
||||
if (
|
||||
any(
|
||||
@ -70,7 +74,7 @@ def rating_security(addon: AddonModel) -> int:
|
||||
if addon.access_docker_api or addon.with_full_access:
|
||||
rating = 1
|
||||
|
||||
return max(min(6, rating), 1)
|
||||
return max(min(8, rating), 1)
|
||||
|
||||
|
||||
async def remove_data(folder: Path) -> None:
|
||||
|
@ -110,7 +110,7 @@ from ..validate import (
|
||||
uuid_match,
|
||||
version_tag,
|
||||
)
|
||||
from .const import ATTR_BACKUP
|
||||
from .const import ATTR_BACKUP, ATTR_CODENOTARY
|
||||
from .options import RE_SCHEMA_ELEMENT
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
@ -317,6 +317,7 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
|
||||
vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce(
|
||||
AddonBackupMode
|
||||
),
|
||||
vol.Optional(ATTR_CODENOTARY): vol.Email(),
|
||||
vol.Optional(ATTR_OPTIONS, default={}): dict,
|
||||
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
|
||||
vol.Schema(
|
||||
|
@ -106,6 +106,7 @@ from ..coresys import CoreSysAttributes
|
||||
from ..docker.stats import DockerStats
|
||||
from ..exceptions import APIError, APIForbidden, PwnedError, PwnedSecret
|
||||
from ..validate import docker_ports
|
||||
from .const import ATTR_SIGNED
|
||||
from .utils import api_process, api_process_raw, api_validate, json_loads
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
@ -269,6 +270,7 @@ class APIAddons(CoreSysAttributes):
|
||||
ATTR_IP_ADDRESS: None,
|
||||
ATTR_TRANSLATIONS: addon.translations,
|
||||
ATTR_INGRESS: addon.with_ingress,
|
||||
ATTR_SIGNED: addon.signed,
|
||||
ATTR_INGRESS_ENTRY: None,
|
||||
ATTR_INGRESS_URL: None,
|
||||
ATTR_INGRESS_PORT: None,
|
||||
|
@ -13,3 +13,4 @@ ATTR_APPARMOR_VERSION = "apparmor_version"
|
||||
ATTR_PANEL_PATH = "panel_path"
|
||||
ATTR_UPDATE_TYPE = "update_type"
|
||||
ATTR_AVAILABLE_UPDATES = "available_updates"
|
||||
ATTR_SIGNED = "signed"
|
||||
|
@ -1,6 +1,7 @@
|
||||
"""Init file for Supervisor add-on Docker object."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from contextlib import suppress
|
||||
from ipaddress import IPv4Address, ip_address
|
||||
import logging
|
||||
@ -673,6 +674,15 @@ class DockerAddon(DockerInterface):
|
||||
self, image_id: str, image: str, version: AwesomeVersion
|
||||
) -> None:
|
||||
"""Validate trust of content."""
|
||||
if not self.addon.signed:
|
||||
return
|
||||
|
||||
checksum = image_id.partition(":")[2]
|
||||
job = asyncio.run_coroutine_threadsafe(
|
||||
self.sys_security.verify_content(self.addon.codenotary, checksum),
|
||||
self.sys_loop,
|
||||
)
|
||||
job.result(timeout=20)
|
||||
|
||||
@Job(conditions=[JobCondition.OS_AGENT])
|
||||
async def _hardware_events(self, device: Device) -> None:
|
||||
|
@ -56,14 +56,14 @@ class Security(FileConfiguration, CoreSysAttributes):
|
||||
"""Set pwned is enabled/disabled."""
|
||||
self._data[ATTR_PWNED] = value
|
||||
|
||||
async def verify_own_content(self, checksum: str) -> Awaitable[None]:
|
||||
"""Verify content from HA org."""
|
||||
async def verify_content(self, signer: str, checksum: str) -> None:
|
||||
"""Verify content on CAS."""
|
||||
if not self.content_trust:
|
||||
_LOGGER.warning("Disabled content-trust, skip validation")
|
||||
return
|
||||
|
||||
try:
|
||||
await cas_validate(checksum=checksum, signer="notary@home-assistant.io")
|
||||
await cas_validate(signer, checksum)
|
||||
except CodeNotaryUntrusted:
|
||||
raise
|
||||
except CodeNotaryError:
|
||||
@ -71,6 +71,10 @@ class Security(FileConfiguration, CoreSysAttributes):
|
||||
raise
|
||||
return
|
||||
|
||||
def verify_own_content(self, checksum: str) -> Awaitable[None]:
|
||||
"""Verify content from HA org."""
|
||||
return self.verify_content("notary@home-assistant.io", checksum)
|
||||
|
||||
async def verify_secret(self, pwned_hash: str) -> None:
|
||||
"""Verify pwned state of a secret."""
|
||||
if not self.pwned:
|
||||
|
55
tests/test_security.py
Normal file
55
tests/test_security.py
Normal file
@ -0,0 +1,55 @@
|
||||
"""Testing handling with Security."""
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.exceptions import CodeNotaryError
|
||||
|
||||
|
||||
async def test_content_trust(coresys: CoreSys):
|
||||
"""Test Content-Trust."""
|
||||
|
||||
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
|
||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
||||
assert cas_validate.called
|
||||
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
|
||||
|
||||
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
|
||||
await coresys.security.verify_own_content("ffffffffffffff")
|
||||
assert cas_validate.called
|
||||
cas_validate.assert_called_once_with(
|
||||
"notary@home-assistant.io", "ffffffffffffff"
|
||||
)
|
||||
|
||||
|
||||
async def test_disabled_content_trust(coresys: CoreSys):
|
||||
"""Test Content-Trust."""
|
||||
coresys.security.content_trust = False
|
||||
|
||||
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
|
||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
||||
assert not cas_validate.called
|
||||
|
||||
with patch("supervisor.security.cas_validate", AsyncMock()) as cas_validate:
|
||||
await coresys.security.verify_own_content("ffffffffffffff")
|
||||
assert not cas_validate.called
|
||||
|
||||
|
||||
async def test_force_content_trust(coresys: CoreSys):
|
||||
"""Force Content-Trust tests."""
|
||||
|
||||
with patch(
|
||||
"supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError)
|
||||
) as cas_validate:
|
||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
||||
assert cas_validate.called
|
||||
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
|
||||
|
||||
coresys.security.force = True
|
||||
|
||||
with patch(
|
||||
"supervisor.security.cas_validate", AsyncMock(side_effect=CodeNotaryError)
|
||||
) as cas_validate:
|
||||
with pytest.raises(CodeNotaryError):
|
||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
Loading…
x
Reference in New Issue
Block a user