mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-07-19 15:16:33 +00:00
Full content trust support (#2735)
* Full content trust support * expose content-trust for info * fix log message * Add system test * Fix name * Add tests * ditch wrong disable * fix partition * remove untrust image
This commit is contained in:
parent
3e89f83e0b
commit
cb3e2dab71
@ -44,6 +44,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
dbus \
|
||||
network-manager \
|
||||
libpulse0 \
|
||||
&& bash <(curl https://getvcn.codenotary.com -L) \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Install Python dependencies from requirements.txt if it exists
|
||||
|
4
.github/workflows/ci.yaml
vendored
4
.github/workflows/ci.yaml
vendored
@ -353,6 +353,10 @@ jobs:
|
||||
id: python
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- name: Install CodeNotary
|
||||
shell: bash
|
||||
run: |
|
||||
bash <(curl https://getvcn.codenotary.com -L)
|
||||
- name: Restore Python virtual environment
|
||||
id: cache-venv
|
||||
uses: actions/cache@v2.1.4
|
||||
|
2
setup.py
2
setup.py
@ -44,7 +44,9 @@ setup(
|
||||
"supervisor.jobs",
|
||||
"supervisor.misc",
|
||||
"supervisor.plugins",
|
||||
"supervisor.resolution.checks",
|
||||
"supervisor.resolution.evaluations",
|
||||
"supervisor.resolution.fixups",
|
||||
"supervisor.resolution",
|
||||
"supervisor.services.modules",
|
||||
"supervisor.services",
|
||||
|
@ -36,6 +36,8 @@ class APIJobs(CoreSysAttributes):
|
||||
|
||||
self.sys_jobs.save_data()
|
||||
|
||||
await self.sys_resolution.evaluate.evaluate_system()
|
||||
|
||||
@api_process
|
||||
async def reset(self, request: web.Request) -> None:
|
||||
"""Reset options for JobManager."""
|
||||
|
@ -15,6 +15,7 @@ from ..const import (
|
||||
ATTR_BLK_READ,
|
||||
ATTR_BLK_WRITE,
|
||||
ATTR_CHANNEL,
|
||||
ATTR_CONTENT_TRUST,
|
||||
ATTR_CPU_PERCENT,
|
||||
ATTR_DEBUG,
|
||||
ATTR_DEBUG_BLOCK,
|
||||
@ -63,6 +64,7 @@ SCHEMA_OPTIONS = vol.Schema(
|
||||
vol.Optional(ATTR_DEBUG): vol.Boolean(),
|
||||
vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(),
|
||||
vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(),
|
||||
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
|
||||
}
|
||||
)
|
||||
|
||||
@ -112,6 +114,7 @@ class APISupervisor(CoreSysAttributes):
|
||||
ATTR_DEBUG: self.sys_config.debug,
|
||||
ATTR_DEBUG_BLOCK: self.sys_config.debug_block,
|
||||
ATTR_DIAGNOSTICS: self.sys_config.diagnostics,
|
||||
ATTR_CONTENT_TRUST: self.sys_config.content_trust,
|
||||
ATTR_ADDONS: list_addons,
|
||||
ATTR_ADDONS_REPOSITORIES: self.sys_config.addons_repositories,
|
||||
}
|
||||
@ -142,6 +145,9 @@ class APISupervisor(CoreSysAttributes):
|
||||
if ATTR_LOGGING in body:
|
||||
self.sys_config.logging = body[ATTR_LOGGING]
|
||||
|
||||
if ATTR_CONTENT_TRUST in body:
|
||||
self.sys_config.content_trust = body[ATTR_CONTENT_TRUST]
|
||||
|
||||
if ATTR_ADDONS_REPOSITORIES in body:
|
||||
new = set(body[ATTR_ADDONS_REPOSITORIES])
|
||||
await asyncio.shield(self.sys_store.update_repositories(new))
|
||||
@ -163,6 +169,8 @@ class APISupervisor(CoreSysAttributes):
|
||||
self.sys_updater.save_data()
|
||||
self.sys_config.save_data()
|
||||
|
||||
await self.sys_resolution.evaluate.evaluate_system()
|
||||
|
||||
@api_process
|
||||
async def stats(self, request: web.Request) -> Dict[str, Any]:
|
||||
"""Return resource information."""
|
||||
|
@ -9,6 +9,7 @@ from awesomeversion import AwesomeVersion
|
||||
|
||||
from .const import (
|
||||
ATTR_ADDONS_CUSTOM_LIST,
|
||||
ATTR_CONTENT_TRUST,
|
||||
ATTR_DEBUG,
|
||||
ATTR_DEBUG_BLOCK,
|
||||
ATTR_DIAGNOSTICS,
|
||||
@ -146,6 +147,16 @@ class CoreConfig(FileConfiguration):
|
||||
"""Set last boot datetime."""
|
||||
self._data[ATTR_LAST_BOOT] = value.isoformat()
|
||||
|
||||
@property
|
||||
def content_trust(self) -> bool:
|
||||
"""Return if content trust is enabled/disabled."""
|
||||
return self._data[ATTR_CONTENT_TRUST]
|
||||
|
||||
@content_trust.setter
|
||||
def content_trust(self, value: bool) -> None:
|
||||
"""Set content trust is enabled/disabled."""
|
||||
self._data[ATTR_CONTENT_TRUST] = value
|
||||
|
||||
@property
|
||||
def path_supervisor(self) -> Path:
|
||||
"""Return Supervisor data path."""
|
||||
|
@ -312,6 +312,7 @@ ATTR_WAIT_BOOT = "wait_boot"
|
||||
ATTR_WATCHDOG = "watchdog"
|
||||
ATTR_WEBUI = "webui"
|
||||
ATTR_WIFI = "wifi"
|
||||
ATTR_CONTENT_TRUST = "content_trust"
|
||||
|
||||
PROVIDE_SERVICE = "provide"
|
||||
NEED_SERVICE = "need"
|
||||
|
@ -2,6 +2,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Optional, TypeVar
|
||||
@ -12,6 +13,8 @@ import sentry_sdk
|
||||
from .config import CoreConfig
|
||||
from .const import ENV_SUPERVISOR_DEV
|
||||
from .docker import DockerAPI
|
||||
from .exceptions import CodeNotaryUntrusted
|
||||
from .resolution.const import UnhealthyReason
|
||||
from .utils.codenotary import vcn_validate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -41,6 +44,8 @@ if TYPE_CHECKING:
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CoreSys:
|
||||
"""Class that handle all shared data."""
|
||||
@ -613,8 +618,16 @@ class CoreSysAttributes:
|
||||
"""Capture a exception."""
|
||||
sentry_sdk.capture_exception(err)
|
||||
|
||||
def sys_verify_content(
|
||||
async def sys_verify_content(
|
||||
self, checksum: Optional[str] = None, path: Optional[Path] = None
|
||||
) -> Awaitable[bool]:
|
||||
) -> Awaitable[None]:
|
||||
"""Verify content from HA org."""
|
||||
return vcn_validate(checksum, path, org="home-assistant.io")
|
||||
if not self.sys_config.content_trust:
|
||||
_LOGGER.warning("Disabled content-trust, skip validation")
|
||||
return
|
||||
|
||||
try:
|
||||
await vcn_validate(checksum, path, org="home-assistant.io")
|
||||
except CodeNotaryUntrusted:
|
||||
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
|
||||
raise
|
||||
|
@ -630,3 +630,6 @@ class DockerAddon(DockerInterface):
|
||||
_LOGGER.warning("Can't update DNS for %s", self.name)
|
||||
self.sys_capture_exception(err)
|
||||
super()._stop(remove_container)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -105,3 +105,6 @@ class DockerAudio(DockerInterface, CoreSysAttributes):
|
||||
self.version,
|
||||
self.sys_docker.network.audio,
|
||||
)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -63,3 +63,6 @@ class DockerCli(DockerInterface, CoreSysAttributes):
|
||||
self.version,
|
||||
self.sys_docker.network.cli,
|
||||
)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -58,3 +58,6 @@ class DockerDNS(DockerInterface, CoreSysAttributes):
|
||||
self.version,
|
||||
self.sys_docker.network.dns,
|
||||
)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -210,3 +210,6 @@ class DockerHomeAssistant(DockerInterface):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -19,7 +19,15 @@ from ..const import (
|
||||
LABEL_VERSION,
|
||||
)
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import DockerAPIError, DockerError, DockerNotFound, DockerRequestError
|
||||
from ..exceptions import (
|
||||
CodeNotaryError,
|
||||
CodeNotaryUntrusted,
|
||||
DockerAPIError,
|
||||
DockerError,
|
||||
DockerNotFound,
|
||||
DockerRequestError,
|
||||
DockerTrustError,
|
||||
)
|
||||
from ..resolution.const import ContextType, IssueType, SuggestionType
|
||||
from ..utils import process_lock
|
||||
from .stats import DockerStats
|
||||
@ -162,7 +170,20 @@ class DockerInterface(CoreSysAttributes):
|
||||
# Try login if we have defined credentials
|
||||
self._docker_login(image)
|
||||
|
||||
# Pull new image
|
||||
docker_image = self.sys_docker.images.pull(f"{image}:{version!s}")
|
||||
|
||||
# Validate content
|
||||
try:
|
||||
self._validate_trust(docker_image.id)
|
||||
except CodeNotaryError:
|
||||
with suppress(docker.errors.DockerException):
|
||||
self.sys_docker.images.remove(
|
||||
image=f"{image}:{version!s}", force=True
|
||||
)
|
||||
raise
|
||||
|
||||
# Tag latest
|
||||
if latest:
|
||||
_LOGGER.info(
|
||||
"Tagging image %s with version %s as latest", image, version
|
||||
@ -182,9 +203,20 @@ class DockerInterface(CoreSysAttributes):
|
||||
)
|
||||
raise DockerError() from err
|
||||
except (docker.errors.DockerException, requests.RequestException) as err:
|
||||
_LOGGER.error("Unknown error with %s:%s -> %s", image, version, err)
|
||||
self.sys_capture_exception(err)
|
||||
raise DockerError() from err
|
||||
raise DockerError(
|
||||
f"Unknown error with {image}:{version} -> {err!s}", _LOGGER.error
|
||||
) from err
|
||||
except CodeNotaryUntrusted as err:
|
||||
raise DockerTrustError(
|
||||
f"Pulled image {image}:{version} failed on content-trust verification!",
|
||||
_LOGGER.critical,
|
||||
) from err
|
||||
except CodeNotaryError as err:
|
||||
raise DockerTrustError(
|
||||
f"Error happened on Content-Trust check for {image}:{version}: {err!s}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
else:
|
||||
self._meta = docker_image.attrs
|
||||
|
||||
@ -578,3 +610,11 @@ class DockerInterface(CoreSysAttributes):
|
||||
raise DockerError() from err
|
||||
|
||||
return CommandReturn(code, output)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
checksum = image_id.partition(":")[2]
|
||||
job = asyncio.run_coroutine_threadsafe(
|
||||
self.sys_verify_content(checksum=checksum), self.sys_loop
|
||||
)
|
||||
job.result(timeout=20)
|
||||
|
@ -52,3 +52,6 @@ class DockerMulticast(DockerInterface, CoreSysAttributes):
|
||||
_LOGGER.info(
|
||||
"Starting Multicast %s with version %s - Host", self.image, self.version
|
||||
)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -63,3 +63,6 @@ class DockerObserver(DockerInterface, CoreSysAttributes):
|
||||
self.version,
|
||||
self.sys_docker.network.observer,
|
||||
)
|
||||
|
||||
def _validate_trust(self, image_id: str) -> None:
|
||||
"""Validate trust of content."""
|
||||
|
@ -346,6 +346,10 @@ class DockerRequestError(DockerError):
|
||||
"""Dockerd OS issues."""
|
||||
|
||||
|
||||
class DockerTrustError(DockerError):
|
||||
"""Raise if images are not trusted."""
|
||||
|
||||
|
||||
class DockerNotFound(DockerError):
|
||||
"""Docker object don't Exists."""
|
||||
|
||||
|
@ -19,6 +19,7 @@ class ResolutionCheck(CoreSysAttributes):
|
||||
"""Initialize the checks class."""
|
||||
self.coresys = coresys
|
||||
self._checks: Dict[str, CheckBase] = {}
|
||||
|
||||
self._load()
|
||||
|
||||
@property
|
||||
|
@ -13,7 +13,7 @@ from ..const import ContextType, IssueType, SuggestionType
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckAddonPwned":
|
||||
def setup(coresys: CoreSys) -> CheckBase:
|
||||
"""Check setup function."""
|
||||
return CheckAddonPwned(coresys)
|
||||
|
||||
|
@ -11,7 +11,7 @@ from ..const import ContextType, IssueType, SuggestionType
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckCoreSecurity":
|
||||
def setup(coresys: CoreSys) -> CheckBase:
|
||||
"""Check setup function."""
|
||||
return CheckCoreSecurity(coresys)
|
||||
|
||||
|
@ -14,7 +14,7 @@ from ..data import Suggestion
|
||||
from .base import CheckBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> "CheckFreeSpace":
|
||||
def setup(coresys: CoreSys) -> CheckBase:
|
||||
"""Check setup function."""
|
||||
return CheckFreeSpace(coresys)
|
||||
|
||||
|
@ -37,6 +37,7 @@ class UnsupportedReason(str, Enum):
|
||||
PRIVILEGED = "privileged"
|
||||
SYSTEMD = "systemd"
|
||||
JOB_CONDITIONS = "job_conditions"
|
||||
CONTENT_TRUST = "content_trust"
|
||||
|
||||
|
||||
class UnhealthyReason(str, Enum):
|
||||
@ -46,6 +47,7 @@ class UnhealthyReason(str, Enum):
|
||||
SUPERVISOR = "supervisor"
|
||||
SETUP = "setup"
|
||||
PRIVILEGED = "privileged"
|
||||
UNTRUSTED = "untrusted"
|
||||
|
||||
|
||||
class IssueType(str, Enum):
|
||||
|
@ -1,20 +1,13 @@
|
||||
"""Helpers to evaluate the system."""
|
||||
from importlib import import_module
|
||||
import logging
|
||||
from typing import List, Set
|
||||
from typing import Dict, List, Set
|
||||
|
||||
from ..coresys import CoreSys, CoreSysAttributes
|
||||
from ..exceptions import ResolutionNotFound
|
||||
from .const import UnhealthyReason, UnsupportedReason
|
||||
from .evaluations.base import EvaluateBase
|
||||
from .evaluations.container import EvaluateContainer
|
||||
from .evaluations.dbus import EvaluateDbus
|
||||
from .evaluations.docker_configuration import EvaluateDockerConfiguration
|
||||
from .evaluations.docker_version import EvaluateDockerVersion
|
||||
from .evaluations.job_conditions import EvaluateJobConditions
|
||||
from .evaluations.lxc import EvaluateLxc
|
||||
from .evaluations.network_manager import EvaluateNetworkManager
|
||||
from .evaluations.operating_system import EvaluateOperatingSystem
|
||||
from .evaluations.privileged import EvaluatePrivileged
|
||||
from .evaluations.systemd import EvaluateSystemd
|
||||
from .validate import get_valid_modules
|
||||
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
@ -32,41 +25,36 @@ class ResolutionEvaluation(CoreSysAttributes):
|
||||
def __init__(self, coresys: CoreSys) -> None:
|
||||
"""Initialize the evaluation class."""
|
||||
self.coresys = coresys
|
||||
|
||||
self.cached_images: Set[str] = set()
|
||||
self._evalutions: Dict[str, EvaluateBase] = {}
|
||||
|
||||
self._container = EvaluateContainer(coresys)
|
||||
self._dbus = EvaluateDbus(coresys)
|
||||
self._docker_configuration = EvaluateDockerConfiguration(coresys)
|
||||
self._docker_version = EvaluateDockerVersion(coresys)
|
||||
self._lxc = EvaluateLxc(coresys)
|
||||
self._network_manager = EvaluateNetworkManager(coresys)
|
||||
self._operating_system = EvaluateOperatingSystem(coresys)
|
||||
self._privileged = EvaluatePrivileged(coresys)
|
||||
self._systemd = EvaluateSystemd(coresys)
|
||||
self._job_conditions = EvaluateJobConditions(coresys)
|
||||
self._load()
|
||||
|
||||
@property
|
||||
def all_evalutions(self) -> List[EvaluateBase]:
|
||||
"""Return list of all evaluations."""
|
||||
return [
|
||||
self._container,
|
||||
self._dbus,
|
||||
self._docker_configuration,
|
||||
self._docker_version,
|
||||
self._lxc,
|
||||
self._network_manager,
|
||||
self._operating_system,
|
||||
self._privileged,
|
||||
self._systemd,
|
||||
self._job_conditions,
|
||||
]
|
||||
def all_evaluations(self) -> List[EvaluateBase]:
|
||||
"""Return all list of all checks."""
|
||||
return list(self._evalutions.values())
|
||||
|
||||
def _load(self):
|
||||
"""Load all checks."""
|
||||
package = f"{__package__}.evaluations"
|
||||
for module in get_valid_modules("evaluations"):
|
||||
check_module = import_module(f"{package}.{module}")
|
||||
check = check_module.setup(self.coresys)
|
||||
self._evalutions[check.slug] = check
|
||||
|
||||
def get(self, slug: str) -> EvaluateBase:
|
||||
"""Return check based on slug."""
|
||||
if slug in self._evalutions:
|
||||
return self._evalutions[slug]
|
||||
|
||||
raise ResolutionNotFound(f"Check with slug {slug} not found!")
|
||||
|
||||
async def evaluate_system(self) -> None:
|
||||
"""Evaluate the system."""
|
||||
_LOGGER.info("Starting system evaluation with state %s", self.sys_core.state)
|
||||
|
||||
for evaluation in self.all_evalutions:
|
||||
for evaluation in self.all_evaluations:
|
||||
try:
|
||||
await evaluation()
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
|
@ -43,6 +43,11 @@ class EvaluateBase(ABC, CoreSysAttributes):
|
||||
def reason(self) -> UnsupportedReason:
|
||||
"""Return a UnsupportedReason enum."""
|
||||
|
||||
@property
|
||||
def slug(self) -> str:
|
||||
"""Return the check slug."""
|
||||
return self.__class__.__module__.split(".")[-1]
|
||||
|
||||
@property
|
||||
@abstractproperty
|
||||
def on_failure(self) -> str:
|
||||
|
@ -18,6 +18,11 @@ DOCKER_IMAGE_DENYLIST = [
|
||||
]
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateContainer(coresys)
|
||||
|
||||
|
||||
class EvaluateContainer(EvaluateBase):
|
||||
"""Evaluate container."""
|
||||
|
||||
|
35
supervisor/resolution/evaluations/content_trust.py
Normal file
35
supervisor/resolution/evaluations/content_trust.py
Normal file
@ -0,0 +1,35 @@
|
||||
"""Evaluation class for Content Trust."""
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateContentTrust(coresys)
|
||||
|
||||
|
||||
class EvaluateContentTrust(EvaluateBase):
|
||||
"""Evaluate system content trust level."""
|
||||
|
||||
@property
|
||||
def reason(self) -> UnsupportedReason:
|
||||
"""Return a UnsupportedReason enum."""
|
||||
return UnsupportedReason.CONTENT_TRUST
|
||||
|
||||
@property
|
||||
def on_failure(self) -> str:
|
||||
"""Return a string that is printed when self.evaluate is False."""
|
||||
return "System run with disabled trusted content security."
|
||||
|
||||
@property
|
||||
def states(self) -> List[CoreState]:
|
||||
"""Return a list of valid states when this evaluation can run."""
|
||||
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
|
||||
|
||||
async def evaluate(self) -> None:
|
||||
"""Run evaluation."""
|
||||
return not self.sys_config.content_trust
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import SOCKET_DBUS, CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateDbus(coresys)
|
||||
|
||||
|
||||
class EvaluateDbus(EvaluateBase):
|
||||
"""Evaluate dbus."""
|
||||
|
||||
|
@ -3,6 +3,7 @@ import logging
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
@ -12,6 +13,11 @@ EXPECTED_STORAGE = "overlay2"
|
||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateDockerConfiguration(coresys)
|
||||
|
||||
|
||||
class EvaluateDockerConfiguration(EvaluateBase):
|
||||
"""Evaluate Docker configuration."""
|
||||
|
||||
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateDockerVersion(coresys)
|
||||
|
||||
|
||||
class EvaluateDockerVersion(EvaluateBase):
|
||||
"""Evaluate Docker version."""
|
||||
|
||||
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateJobConditions(coresys)
|
||||
|
||||
|
||||
class EvaluateJobConditions(EvaluateBase):
|
||||
"""Evaluate job conditions."""
|
||||
|
||||
|
@ -4,10 +4,16 @@ from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateLxc(coresys)
|
||||
|
||||
|
||||
class EvaluateLxc(EvaluateBase):
|
||||
"""Evaluate if running inside LXC."""
|
||||
|
||||
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState, HostFeature
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateNetworkManager(coresys)
|
||||
|
||||
|
||||
class EvaluateNetworkManager(EvaluateBase):
|
||||
"""Evaluate network manager."""
|
||||
|
||||
|
@ -2,12 +2,18 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
SUPPORTED_OS = ["Debian GNU/Linux 10 (buster)"]
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateOperatingSystem(coresys)
|
||||
|
||||
|
||||
class EvaluateOperatingSystem(EvaluateBase):
|
||||
"""Evaluate the operating system."""
|
||||
|
||||
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluatePrivileged(coresys)
|
||||
|
||||
|
||||
class EvaluatePrivileged(EvaluateBase):
|
||||
"""Evaluate Privileged mode."""
|
||||
|
||||
|
@ -2,10 +2,16 @@
|
||||
from typing import List
|
||||
|
||||
from ...const import CoreState, HostFeature
|
||||
from ...coresys import CoreSys
|
||||
from ..const import UnsupportedReason
|
||||
from .base import EvaluateBase
|
||||
|
||||
|
||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
||||
"""Initialize evaluation-setup function."""
|
||||
return EvaluateSystemd(coresys)
|
||||
|
||||
|
||||
class EvaluateSystemd(EvaluateBase):
|
||||
"""Evaluate systemd."""
|
||||
|
||||
|
@ -201,10 +201,15 @@ class Updater(FileConfiguration, CoreSysAttributes):
|
||||
# Validate
|
||||
try:
|
||||
await self.sys_verify_content(checksum=calc_checksum(data))
|
||||
except CodeNotaryUntrusted:
|
||||
_LOGGER.critical("Content-Trust is broken for the version file fetch!")
|
||||
except CodeNotaryUntrusted as err:
|
||||
raise UpdaterError(
|
||||
"Content-Trust is broken for the version file fetch!", _LOGGER.critical
|
||||
) from err
|
||||
except CodeNotaryError as err:
|
||||
_LOGGER.error("CodeNotary error while processing version checks: %s", err)
|
||||
raise UpdaterError(
|
||||
f"CodeNotary error while processing version checks: {err!s}",
|
||||
_LOGGER.error,
|
||||
) from err
|
||||
|
||||
# Parse data
|
||||
try:
|
||||
|
@ -11,6 +11,7 @@ from .const import (
|
||||
ATTR_AUDIO,
|
||||
ATTR_CHANNEL,
|
||||
ATTR_CLI,
|
||||
ATTR_CONTENT_TRUST,
|
||||
ATTR_DEBUG,
|
||||
ATTR_DEBUG_BLOCK,
|
||||
ATTR_DIAGNOSTICS,
|
||||
@ -147,6 +148,7 @@ SCHEMA_SUPERVISOR_CONFIG = vol.Schema(
|
||||
vol.Optional(ATTR_DEBUG, default=False): vol.Boolean(),
|
||||
vol.Optional(ATTR_DEBUG_BLOCK, default=False): vol.Boolean(),
|
||||
vol.Optional(ATTR_DIAGNOSTICS, default=None): vol.Maybe(vol.Boolean()),
|
||||
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
|
||||
},
|
||||
extra=vol.REMOVE_EXTRA,
|
||||
)
|
||||
|
@ -1,11 +1,11 @@
|
||||
"""Common test functions."""
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
import re
|
||||
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
from aiohttp import web
|
||||
from aiohttp.test_utils import TestClient
|
||||
from awesomeversion import AwesomeVersion
|
||||
import pytest
|
||||
|
||||
@ -130,9 +130,6 @@ async def coresys(loop, docker, network_manager, aiohttp_client) -> CoreSys:
|
||||
), patch(
|
||||
"supervisor.bootstrap.fetch_timezone",
|
||||
return_value="Europe/Zurich",
|
||||
), patch(
|
||||
"aiohttp.ClientSession",
|
||||
return_value=TestClient.session,
|
||||
):
|
||||
coresys_obj = await initialize_coresys()
|
||||
|
||||
@ -167,6 +164,10 @@ async def coresys(loop, docker, network_manager, aiohttp_client) -> CoreSys:
|
||||
|
||||
yield coresys_obj
|
||||
|
||||
await asyncio.gather(
|
||||
coresys_obj.websession.close(), coresys_obj.websession_ssl.close()
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sys_machine():
|
||||
|
45
tests/resolution/evaluation/test_evaluate_content_trust.py
Normal file
45
tests/resolution/evaluation/test_evaluate_content_trust.py
Normal file
@ -0,0 +1,45 @@
|
||||
"""Test evaluation base."""
|
||||
# pylint: disable=import-error,protected-access
|
||||
from unittest.mock import patch
|
||||
|
||||
from supervisor.const import CoreState
|
||||
from supervisor.coresys import CoreSys
|
||||
from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
|
||||
|
||||
|
||||
async def test_evaluation(coresys: CoreSys):
|
||||
"""Test evaluation."""
|
||||
job_conditions = EvaluateContentTrust(coresys)
|
||||
coresys.core.state = CoreState.SETUP
|
||||
|
||||
await job_conditions()
|
||||
assert job_conditions.reason not in coresys.resolution.unsupported
|
||||
|
||||
coresys.config.content_trust = False
|
||||
await job_conditions()
|
||||
assert job_conditions.reason in coresys.resolution.unsupported
|
||||
|
||||
|
||||
async def test_did_run(coresys: CoreSys):
|
||||
"""Test that the evaluation ran as expected."""
|
||||
job_conditions = EvaluateContentTrust(coresys)
|
||||
should_run = job_conditions.states
|
||||
should_not_run = [state for state in CoreState if state not in should_run]
|
||||
assert len(should_run) != 0
|
||||
assert len(should_not_run) != 0
|
||||
|
||||
with patch(
|
||||
"supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate",
|
||||
return_value=None,
|
||||
) as evaluate:
|
||||
for state in should_run:
|
||||
coresys.core.state = state
|
||||
await job_conditions()
|
||||
evaluate.assert_called_once()
|
||||
evaluate.reset_mock()
|
||||
|
||||
for state in should_not_run:
|
||||
coresys.core.state = state
|
||||
await job_conditions()
|
||||
evaluate.assert_not_called()
|
||||
evaluate.reset_mock()
|
49
tests/test_updater.py
Normal file
49
tests/test_updater.py
Normal file
@ -0,0 +1,49 @@
|
||||
"""Test updater files."""
|
||||
|
||||
import pytest
|
||||
|
||||
from supervisor.coresys import CoreSys
|
||||
|
||||
URL_TEST = "https://version.home-assistant.io/stable.json"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetch_versions(coresys: CoreSys) -> None:
|
||||
"""Test download and sync version."""
|
||||
|
||||
await coresys.updater.fetch_data()
|
||||
|
||||
async with coresys.websession.get(URL_TEST) as request:
|
||||
data = await request.json()
|
||||
|
||||
assert coresys.updater.version_supervisor == data["supervisor"]
|
||||
assert coresys.updater.version_homeassistant == data["homeassistant"]["default"]
|
||||
|
||||
assert coresys.updater.version_audio == data["audio"]
|
||||
assert coresys.updater.version_cli == data["cli"]
|
||||
assert coresys.updater.version_dns == data["dns"]
|
||||
assert coresys.updater.version_multicast == data["multicast"]
|
||||
assert coresys.updater.version_observer == data["observer"]
|
||||
|
||||
assert coresys.updater.image_homeassistant == data["image"]["core"].format(
|
||||
machine=coresys.machine
|
||||
)
|
||||
|
||||
assert coresys.updater.image_supervisor == data["image"]["supervisor"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
||||
assert coresys.updater.image_cli == data["image"]["cli"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
||||
assert coresys.updater.image_audio == data["image"]["audio"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
||||
assert coresys.updater.image_dns == data["image"]["dns"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
||||
assert coresys.updater.image_observer == data["image"]["observer"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
||||
assert coresys.updater.image_multicast == data["image"]["multicast"].format(
|
||||
arch=coresys.arch.supervisor
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user