diff --git a/.github/workflows/builder.yml b/.github/workflows/builder.yml index 2ef857f97..08597763e 100644 --- a/.github/workflows/builder.yml +++ b/.github/workflows/builder.yml @@ -170,8 +170,6 @@ jobs: --target /data \ --cosign \ --generic ${{ needs.init.outputs.version }} - env: - CAS_API_KEY: ${{ secrets.CAS_TOKEN }} version: name: Update version @@ -293,33 +291,6 @@ jobs: exit 1 fi - - name: Check the Supervisor code sign - if: needs.init.outputs.publish == 'true' - run: | - echo "Enable Content-Trust" - test=$(docker exec hassio_cli ha security options --content-trust=true --no-progress --raw-json | jq -r '.result') - if [ "$test" != "ok" ]; then - exit 1 - fi - - echo "Run supervisor health check" - test=$(docker exec hassio_cli ha resolution healthcheck --no-progress --raw-json | jq -r '.result') - if [ "$test" != "ok" ]; then - exit 1 - fi - - echo "Check supervisor unhealthy" - test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unhealthy[]') - if [ "$test" != "" ]; then - exit 1 - fi - - echo "Check supervisor supported" - test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unsupported[]') - if [[ "$test" =~ source_mods ]]; then - exit 1 - fi - - name: Create full backup id: backup run: | diff --git a/supervisor/addons/addon.py b/supervisor/addons/addon.py index c80d53450..8a9f300ea 100644 --- a/supervisor/addons/addon.py +++ b/supervisor/addons/addon.py @@ -1513,13 +1513,6 @@ class Addon(AddonModel): _LOGGER.info("Finished restore for add-on %s", self.slug) return wait_for_start - def check_trust(self) -> Awaitable[None]: - """Calculate Addon docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - @Job( name="addon_restart_after_problem", throttle_period=WATCHDOG_THROTTLE_PERIOD, diff --git a/supervisor/addons/model.py b/supervisor/addons/model.py index b9ccc427b..b7f8c8bc6 100644 --- a/supervisor/addons/model.py +++ b/supervisor/addons/model.py @@ -103,7 +103,6 @@ from .configuration import FolderMapping from .const import ( ATTR_BACKUP, ATTR_BREAKING_VERSIONS, - ATTR_CODENOTARY, ATTR_PATH, ATTR_READ_ONLY, AddonBackupMode, @@ -632,13 +631,8 @@ class AddonModel(JobGroup, ABC): @property def signed(self) -> bool: - """Return True if the image is signed.""" - return ATTR_CODENOTARY in self.data - - @property - def codenotary(self) -> str | None: - """Return Signer email address for CAS.""" - return self.data.get(ATTR_CODENOTARY) + """Currently no signing support.""" + return False @property def breaking_versions(self) -> list[AwesomeVersion]: diff --git a/supervisor/addons/validate.py b/supervisor/addons/validate.py index c9703bef5..e51754c31 100644 --- a/supervisor/addons/validate.py +++ b/supervisor/addons/validate.py @@ -207,6 +207,12 @@ def _warn_addon_config(config: dict[str, Any]): name, ) + if ATTR_CODENOTARY in config: + _LOGGER.warning( + "Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.", + name, + ) + return config @@ -417,7 +423,6 @@ _SCHEMA_ADDON_CONFIG = vol.Schema( vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce( AddonBackupMode ), - vol.Optional(ATTR_CODENOTARY): vol.Email(), vol.Optional(ATTR_OPTIONS, default={}): dict, vol.Optional(ATTR_SCHEMA, default={}): vol.Any( vol.Schema({str: SCHEMA_ELEMENT}), diff --git a/supervisor/api/security.py b/supervisor/api/security.py index ebafe62ee..6ddc59756 100644 --- a/supervisor/api/security.py +++ b/supervisor/api/security.py @@ -1,24 +1,20 @@ """Init file for Supervisor Security RESTful API.""" -import asyncio -import logging from typing import Any from aiohttp import web -import attr import voluptuous as vol -from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED +from supervisor.exceptions import APIGone + +from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED from ..coresys import CoreSysAttributes from .utils import api_process, api_validate -_LOGGER: logging.Logger = logging.getLogger(__name__) - # pylint: disable=no-value-for-parameter SCHEMA_OPTIONS = vol.Schema( { vol.Optional(ATTR_PWNED): vol.Boolean(), - vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(), vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(), } ) @@ -31,7 +27,6 @@ class APISecurity(CoreSysAttributes): async def info(self, request: web.Request) -> dict[str, Any]: """Return Security information.""" return { - ATTR_CONTENT_TRUST: self.sys_security.content_trust, ATTR_PWNED: self.sys_security.pwned, ATTR_FORCE_SECURITY: self.sys_security.force, } @@ -43,8 +38,6 @@ class APISecurity(CoreSysAttributes): if ATTR_PWNED in body: self.sys_security.pwned = body[ATTR_PWNED] - if ATTR_CONTENT_TRUST in body: - self.sys_security.content_trust = body[ATTR_CONTENT_TRUST] if ATTR_FORCE_SECURITY in body: self.sys_security.force = body[ATTR_FORCE_SECURITY] @@ -54,6 +47,9 @@ class APISecurity(CoreSysAttributes): @api_process async def integrity_check(self, request: web.Request) -> dict[str, Any]: - """Run backend integrity check.""" - result = await asyncio.shield(self.sys_security.integrity_check()) - return attr.asdict(result) + """Run backend integrity check. + + CodeNotary integrity checking has been removed. This endpoint now returns + an error indicating the feature is gone. + """ + raise APIGone("Integrity check feature has been removed.") diff --git a/supervisor/api/supervisor.py b/supervisor/api/supervisor.py index cc4a62839..3e1dc38e3 100644 --- a/supervisor/api/supervisor.py +++ b/supervisor/api/supervisor.py @@ -16,14 +16,12 @@ from ..const import ( ATTR_BLK_READ, ATTR_BLK_WRITE, ATTR_CHANNEL, - ATTR_CONTENT_TRUST, ATTR_COUNTRY, ATTR_CPU_PERCENT, ATTR_DEBUG, ATTR_DEBUG_BLOCK, ATTR_DETECT_BLOCKING_IO, ATTR_DIAGNOSTICS, - ATTR_FORCE_SECURITY, ATTR_HEALTHY, ATTR_ICON, ATTR_IP_ADDRESS, @@ -69,8 +67,6 @@ SCHEMA_OPTIONS = vol.Schema( vol.Optional(ATTR_DEBUG): vol.Boolean(), vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(), vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(), - vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(), - vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(), vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(), vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO), vol.Optional(ATTR_COUNTRY): str, diff --git a/supervisor/bootstrap.py b/supervisor/bootstrap.py index b9eb5201a..0726bdb84 100644 --- a/supervisor/bootstrap.py +++ b/supervisor/bootstrap.py @@ -105,7 +105,6 @@ async def initialize_coresys() -> CoreSys: if coresys.dev: coresys.updater.channel = UpdateChannel.DEV - coresys.security.content_trust = False # Convert datetime logging.Formatter.converter = lambda *args: coresys.now().timetuple() diff --git a/supervisor/docker/addon.py b/supervisor/docker/addon.py index 014279485..e50c8da74 100644 --- a/supervisor/docker/addon.py +++ b/supervisor/docker/addon.py @@ -846,16 +846,6 @@ class DockerAddon(DockerInterface): ): self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue) - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - if not self.addon.signed: - return - - checksum = image_id.partition(":")[2] - return await self.sys_security.verify_content( - cast(str, self.addon.codenotary), checksum - ) - @Job( name="docker_addon_hardware_events", conditions=[JobCondition.OS_AGENT], diff --git a/supervisor/docker/homeassistant.py b/supervisor/docker/homeassistant.py index b92559b55..ba559ceed 100644 --- a/supervisor/docker/homeassistant.py +++ b/supervisor/docker/homeassistant.py @@ -5,7 +5,7 @@ from ipaddress import IPv4Address import logging import re -from awesomeversion import AwesomeVersion, AwesomeVersionCompareException +from awesomeversion import AwesomeVersion from docker.types import Mount from ..const import LABEL_MACHINE @@ -244,13 +244,3 @@ class DockerHomeAssistant(DockerInterface): self.image, self.sys_homeassistant.version, ) - - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - try: - if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST: - return - except AwesomeVersionCompareException: - return - - await super()._validate_trust(image_id) diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index b6efddecb..b70e4ea47 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -31,15 +31,12 @@ from ..const import ( ) from ..coresys import CoreSys from ..exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, DockerAPIError, DockerError, DockerJobError, DockerLogOutOfOrder, DockerNotFound, DockerRequestError, - DockerTrustError, ) from ..jobs import SupervisorJob from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency @@ -425,18 +422,6 @@ class DockerInterface(JobGroup, ABC): platform=MAP_ARCH[image_arch], ) - # Validate content - try: - await self._validate_trust(cast(str, docker_image.id)) - except CodeNotaryError: - with suppress(docker.errors.DockerException): - await self.sys_run_in_executor( - self.sys_docker.images.remove, - image=f"{image}:{version!s}", - force=True, - ) - raise - # Tag latest if latest: _LOGGER.info( @@ -462,16 +447,6 @@ class DockerInterface(JobGroup, ABC): raise DockerError( f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error ) from err - except CodeNotaryUntrusted as err: - raise DockerTrustError( - f"Pulled image {image}:{version!s} failed on content-trust verification!", - _LOGGER.critical, - ) from err - except CodeNotaryError as err: - raise DockerTrustError( - f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}", - _LOGGER.error, - ) from err finally: if listener: self.sys_bus.remove_listener(listener) @@ -809,24 +784,3 @@ class DockerInterface(JobGroup, ABC): return self.sys_run_in_executor( self.sys_docker.container_run_inside, self.name, command ) - - async def _validate_trust(self, image_id: str) -> None: - """Validate trust of content.""" - checksum = image_id.partition(":")[2] - return await self.sys_security.verify_own_content(checksum) - - @Job( - name="docker_interface_check_trust", - on_condition=DockerJobError, - concurrency=JobConcurrency.GROUP_REJECT, - ) - async def check_trust(self) -> None: - """Check trust of exists Docker image.""" - try: - image = await self.sys_run_in_executor( - self.sys_docker.images.get, f"{self.image}:{self.version!s}" - ) - except (docker.errors.DockerException, requests.RequestException): - return - - await self._validate_trust(cast(str, image.id)) diff --git a/supervisor/exceptions.py b/supervisor/exceptions.py index 5f9625ab3..5906c4572 100644 --- a/supervisor/exceptions.py +++ b/supervisor/exceptions.py @@ -423,6 +423,12 @@ class APINotFound(APIError): status = 404 +class APIGone(APIError): + """API is no longer available.""" + + status = 410 + + class APIAddonNotInstalled(APIError): """Not installed addon requested at addons API.""" @@ -577,21 +583,6 @@ class PwnedConnectivityError(PwnedError): """Connectivity errors while checking pwned passwords.""" -# util/codenotary - - -class CodeNotaryError(HassioError): - """Error general with CodeNotary.""" - - -class CodeNotaryUntrusted(CodeNotaryError): - """Error on untrusted content.""" - - -class CodeNotaryBackendError(CodeNotaryError): - """CodeNotary backend error happening.""" - - # util/whoami diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index 4eb7e3367..f648e849f 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -428,13 +428,6 @@ class HomeAssistantCore(JobGroup): """ return self.instance.logs() - def check_trust(self) -> Awaitable[None]: - """Calculate HomeAssistant docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - async def stats(self) -> DockerStats: """Return stats of Home Assistant.""" try: diff --git a/supervisor/plugins/base.py b/supervisor/plugins/base.py index 4920b4d2e..002a61cbb 100644 --- a/supervisor/plugins/base.py +++ b/supervisor/plugins/base.py @@ -76,13 +76,6 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes): """Return True if a task is in progress.""" return self.instance.in_progress - def check_trust(self) -> Awaitable[None]: - """Calculate plugin docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - def logs(self) -> Awaitable[bytes]: """Get docker plugin logs. diff --git a/supervisor/resolution/checks/supervisor_trust.py b/supervisor/resolution/checks/supervisor_trust.py deleted file mode 100644 index b4be4329c..000000000 --- a/supervisor/resolution/checks/supervisor_trust.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Helpers to check supervisor trust.""" - -import logging - -from ...const import CoreState -from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ..const import ContextType, IssueType, UnhealthyReason -from .base import CheckBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> CheckBase: - """Check setup function.""" - return CheckSupervisorTrust(coresys) - - -class CheckSupervisorTrust(CheckBase): - """CheckSystemTrust class for check.""" - - async def run_check(self) -> None: - """Run check if not affected by issue.""" - if not self.sys_security.content_trust: - _LOGGER.warning( - "Skipping %s, content_trust is globally disabled", self.slug - ) - return - - try: - await self.sys_supervisor.check_trust() - except CodeNotaryUntrusted: - self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED) - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) - except CodeNotaryError: - pass - - async def approve_check(self, reference: str | None = None) -> bool: - """Approve check if it is affected by issue.""" - try: - await self.sys_supervisor.check_trust() - except CodeNotaryError: - return True - return False - - @property - def issue(self) -> IssueType: - """Return a IssueType enum.""" - return IssueType.TRUST - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.SUPERVISOR - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this check can run.""" - return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/resolution/const.py b/supervisor/resolution/const.py index 9492810b1..206c56223 100644 --- a/supervisor/resolution/const.py +++ b/supervisor/resolution/const.py @@ -39,7 +39,6 @@ class UnsupportedReason(StrEnum): APPARMOR = "apparmor" CGROUP_VERSION = "cgroup_version" CONNECTIVITY_CHECK = "connectivity_check" - CONTENT_TRUST = "content_trust" DBUS = "dbus" DNS_SERVER = "dns_server" DOCKER_CONFIGURATION = "docker_configuration" @@ -54,7 +53,6 @@ class UnsupportedReason(StrEnum): PRIVILEGED = "privileged" RESTART_POLICY = "restart_policy" SOFTWARE = "software" - SOURCE_MODS = "source_mods" SUPERVISOR_VERSION = "supervisor_version" SYSTEMD = "systemd" SYSTEMD_JOURNAL = "systemd_journal" @@ -103,7 +101,6 @@ class IssueType(StrEnum): PWNED = "pwned" REBOOT_REQUIRED = "reboot_required" SECURITY = "security" - TRUST = "trust" UPDATE_FAILED = "update_failed" UPDATE_ROLLBACK = "update_rollback" @@ -115,7 +112,6 @@ class SuggestionType(StrEnum): CLEAR_FULL_BACKUP = "clear_full_backup" CREATE_FULL_BACKUP = "create_full_backup" DISABLE_BOOT = "disable_boot" - EXECUTE_INTEGRITY = "execute_integrity" EXECUTE_REBOOT = "execute_reboot" EXECUTE_REBUILD = "execute_rebuild" EXECUTE_RELOAD = "execute_reload" diff --git a/supervisor/resolution/evaluations/content_trust.py b/supervisor/resolution/evaluations/content_trust.py deleted file mode 100644 index c5648fd0a..000000000 --- a/supervisor/resolution/evaluations/content_trust.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Evaluation class for Content Trust.""" - -from ...const import CoreState -from ...coresys import CoreSys -from ..const import UnsupportedReason -from .base import EvaluateBase - - -def setup(coresys: CoreSys) -> EvaluateBase: - """Initialize evaluation-setup function.""" - return EvaluateContentTrust(coresys) - - -class EvaluateContentTrust(EvaluateBase): - """Evaluate system content trust level.""" - - @property - def reason(self) -> UnsupportedReason: - """Return a UnsupportedReason enum.""" - return UnsupportedReason.CONTENT_TRUST - - @property - def on_failure(self) -> str: - """Return a string that is printed when self.evaluate is True.""" - return "System run with disabled trusted content security." - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this evaluation can run.""" - return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING] - - async def evaluate(self) -> bool: - """Run evaluation.""" - return not self.sys_security.content_trust diff --git a/supervisor/resolution/evaluations/source_mods.py b/supervisor/resolution/evaluations/source_mods.py deleted file mode 100644 index 8bca674a6..000000000 --- a/supervisor/resolution/evaluations/source_mods.py +++ /dev/null @@ -1,72 +0,0 @@ -"""Evaluation class for Content Trust.""" - -import errno -import logging -from pathlib import Path - -from ...const import CoreState -from ...coresys import CoreSys -from ...exceptions import CodeNotaryError, CodeNotaryUntrusted -from ...utils.codenotary import calc_checksum_path_sourcecode -from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason -from .base import EvaluateBase - -_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor") -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> EvaluateBase: - """Initialize evaluation-setup function.""" - return EvaluateSourceMods(coresys) - - -class EvaluateSourceMods(EvaluateBase): - """Evaluate supervisor source modifications.""" - - @property - def reason(self) -> UnsupportedReason: - """Return a UnsupportedReason enum.""" - return UnsupportedReason.SOURCE_MODS - - @property - def on_failure(self) -> str: - """Return a string that is printed when self.evaluate is True.""" - return "System detect unauthorized source code modifications." - - @property - def states(self) -> list[CoreState]: - """Return a list of valid states when this evaluation can run.""" - return [CoreState.RUNNING] - - async def evaluate(self) -> bool: - """Run evaluation.""" - if not self.sys_security.content_trust: - _LOGGER.warning("Disabled content-trust, skipping evaluation") - return False - - # Calculate sume of the sourcecode - try: - checksum = await self.sys_run_in_executor( - calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE - ) - except OSError as err: - if err.errno == errno.EBADMSG: - self.sys_resolution.add_unhealthy_reason( - UnhealthyReason.OSERROR_BAD_MESSAGE - ) - - self.sys_resolution.create_issue( - IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM - ) - _LOGGER.error("Can't calculate checksum of source code: %s", err) - return False - - # Validate checksum - try: - await self.sys_security.verify_own_content(checksum) - except CodeNotaryUntrusted: - return True - except CodeNotaryError: - pass - - return False diff --git a/supervisor/resolution/fixups/system_execute_integrity.py b/supervisor/resolution/fixups/system_execute_integrity.py deleted file mode 100644 index a908b09b0..000000000 --- a/supervisor/resolution/fixups/system_execute_integrity.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Helpers to check and fix issues with free space.""" - -from datetime import timedelta -import logging - -from ...coresys import CoreSys -from ...exceptions import ResolutionFixupError, ResolutionFixupJobError -from ...jobs.const import JobCondition, JobThrottle -from ...jobs.decorator import Job -from ...security.const import ContentTrustResult -from ..const import ContextType, IssueType, SuggestionType -from .base import FixupBase - -_LOGGER: logging.Logger = logging.getLogger(__name__) - - -def setup(coresys: CoreSys) -> FixupBase: - """Check setup function.""" - return FixupSystemExecuteIntegrity(coresys) - - -class FixupSystemExecuteIntegrity(FixupBase): - """Storage class for fixup.""" - - @Job( - name="fixup_system_execute_integrity_process", - conditions=[JobCondition.INTERNET_SYSTEM], - on_condition=ResolutionFixupJobError, - throttle_period=timedelta(hours=8), - throttle=JobThrottle.THROTTLE, - ) - async def process_fixup(self, reference: str | None = None) -> None: - """Initialize the fixup class.""" - result = await self.sys_security.integrity_check() - - if ContentTrustResult.FAILED in (result.core, result.supervisor): - raise ResolutionFixupError() - - for plugin in result.plugins: - if plugin != ContentTrustResult.FAILED: - continue - raise ResolutionFixupError() - - for addon in result.addons: - if addon != ContentTrustResult.FAILED: - continue - raise ResolutionFixupError() - - @property - def suggestion(self) -> SuggestionType: - """Return a SuggestionType enum.""" - return SuggestionType.EXECUTE_INTEGRITY - - @property - def context(self) -> ContextType: - """Return a ContextType enum.""" - return ContextType.SYSTEM - - @property - def issues(self) -> list[IssueType]: - """Return a IssueType enum list.""" - return [IssueType.TRUST] - - @property - def auto(self) -> bool: - """Return if a fixup can be apply as auto fix.""" - return True diff --git a/supervisor/security/const.py b/supervisor/security/const.py deleted file mode 100644 index ad875e6fa..000000000 --- a/supervisor/security/const.py +++ /dev/null @@ -1,24 +0,0 @@ -"""Security constants.""" - -from enum import StrEnum - -import attr - - -class ContentTrustResult(StrEnum): - """Content trust result enum.""" - - PASS = "pass" - ERROR = "error" - FAILED = "failed" - UNTESTED = "untested" - - -@attr.s -class IntegrityResult: - """Result of a full integrity check.""" - - supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) - core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED) - plugins: dict[str, ContentTrustResult] = attr.ib(default={}) - addons: dict[str, ContentTrustResult] = attr.ib(default={}) diff --git a/supervisor/security/module.py b/supervisor/security/module.py index f09a52dc0..f2f622fd3 100644 --- a/supervisor/security/module.py +++ b/supervisor/security/module.py @@ -4,27 +4,12 @@ from __future__ import annotations import logging -from ..const import ( - ATTR_CONTENT_TRUST, - ATTR_FORCE_SECURITY, - ATTR_PWNED, - FILE_HASSIO_SECURITY, -) +from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED, FILE_HASSIO_SECURITY from ..coresys import CoreSys, CoreSysAttributes -from ..exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, - PwnedError, - SecurityJobError, -) -from ..jobs.const import JobConcurrency -from ..jobs.decorator import Job, JobCondition -from ..resolution.const import ContextType, IssueType, SuggestionType -from ..utils.codenotary import cas_validate +from ..exceptions import PwnedError from ..utils.common import FileConfiguration from ..utils.pwned import check_pwned_password from ..validate import SCHEMA_SECURITY_CONFIG -from .const import ContentTrustResult, IntegrityResult _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -37,16 +22,6 @@ class Security(FileConfiguration, CoreSysAttributes): super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG) self.coresys = coresys - @property - def content_trust(self) -> bool: - """Return if content trust is enabled/disabled.""" - return self._data[ATTR_CONTENT_TRUST] - - @content_trust.setter - def content_trust(self, value: bool) -> None: - """Set content trust is enabled/disabled.""" - self._data[ATTR_CONTENT_TRUST] = value - @property def force(self) -> bool: """Return if force security is enabled/disabled.""" @@ -67,30 +42,6 @@ class Security(FileConfiguration, CoreSysAttributes): """Set pwned is enabled/disabled.""" self._data[ATTR_PWNED] = value - async def verify_content(self, signer: str, checksum: str) -> None: - """Verify content on CAS.""" - if not self.content_trust: - _LOGGER.warning("Disabled content-trust, skip validation") - return - - try: - await cas_validate(signer, checksum) - except CodeNotaryUntrusted: - raise - except CodeNotaryError: - if self.force: - raise - self.sys_resolution.create_issue( - IssueType.TRUST, - ContextType.SYSTEM, - suggestions=[SuggestionType.EXECUTE_INTEGRITY], - ) - return - - async def verify_own_content(self, checksum: str) -> None: - """Verify content from HA org.""" - return await self.verify_content("notary@home-assistant.io", checksum) - async def verify_secret(self, pwned_hash: str) -> None: """Verify pwned state of a secret.""" if not self.pwned: @@ -103,73 +54,3 @@ class Security(FileConfiguration, CoreSysAttributes): if self.force: raise return - - @Job( - name="security_manager_integrity_check", - conditions=[JobCondition.INTERNET_SYSTEM], - on_condition=SecurityJobError, - concurrency=JobConcurrency.REJECT, - ) - async def integrity_check(self) -> IntegrityResult: - """Run a full system integrity check of the platform. - - We only allow to install trusted content. - This is a out of the band manual check. - """ - result: IntegrityResult = IntegrityResult() - if not self.content_trust: - _LOGGER.warning( - "Skipping integrity check, content_trust is globally disabled" - ) - return result - - # Supervisor - try: - await self.sys_supervisor.check_trust() - result.supervisor = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.supervisor = ContentTrustResult.ERROR - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) - except CodeNotaryError: - result.supervisor = ContentTrustResult.FAILED - - # Core - try: - await self.sys_homeassistant.core.check_trust() - result.core = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.core = ContentTrustResult.ERROR - self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) - except CodeNotaryError: - result.core = ContentTrustResult.FAILED - - # Plugins - for plugin in self.sys_plugins.all_plugins: - try: - await plugin.check_trust() - result.plugins[plugin.slug] = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.plugins[plugin.slug] = ContentTrustResult.ERROR - self.sys_resolution.create_issue( - IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug - ) - except CodeNotaryError: - result.plugins[plugin.slug] = ContentTrustResult.FAILED - - # Add-ons - for addon in self.sys_addons.installed: - if not addon.signed: - result.addons[addon.slug] = ContentTrustResult.UNTESTED - continue - try: - await addon.check_trust() - result.addons[addon.slug] = ContentTrustResult.PASS - except CodeNotaryUntrusted: - result.addons[addon.slug] = ContentTrustResult.ERROR - self.sys_resolution.create_issue( - IssueType.TRUST, ContextType.ADDON, reference=addon.slug - ) - except CodeNotaryError: - result.addons[addon.slug] = ContentTrustResult.FAILED - - return result diff --git a/supervisor/supervisor.py b/supervisor/supervisor.py index 8b638f369..c5c3c7a04 100644 --- a/supervisor/supervisor.py +++ b/supervisor/supervisor.py @@ -25,8 +25,6 @@ from .coresys import CoreSys, CoreSysAttributes from .docker.stats import DockerStats from .docker.supervisor import DockerSupervisor from .exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, DockerError, HostAppArmorError, SupervisorAppArmorError, @@ -37,7 +35,6 @@ from .exceptions import ( from .jobs.const import JobCondition, JobThrottle from .jobs.decorator import Job from .resolution.const import ContextType, IssueType, UnhealthyReason -from .utils.codenotary import calc_checksum from .utils.sentry import async_capture_exception _LOGGER: logging.Logger = logging.getLogger(__name__) @@ -150,20 +147,6 @@ class Supervisor(CoreSysAttributes): _LOGGER.error, ) from err - # Validate - try: - await self.sys_security.verify_own_content(calc_checksum(data)) - except CodeNotaryUntrusted as err: - raise SupervisorAppArmorError( - "Content-Trust is broken for the AppArmor profile fetch!", - _LOGGER.critical, - ) from err - except CodeNotaryError as err: - raise SupervisorAppArmorError( - f"CodeNotary error while processing AppArmor fetch: {err!s}", - _LOGGER.error, - ) from err - # Load temp_dir: TemporaryDirectory | None = None @@ -273,13 +256,6 @@ class Supervisor(CoreSysAttributes): """ return self.instance.logs() - def check_trust(self) -> Awaitable[None]: - """Calculate Supervisor docker content trust. - - Return Coroutine. - """ - return self.instance.check_trust() - async def stats(self) -> DockerStats: """Return stats of Supervisor.""" try: diff --git a/supervisor/updater.py b/supervisor/updater.py index a4b837e46..17bffe849 100644 --- a/supervisor/updater.py +++ b/supervisor/updater.py @@ -31,14 +31,8 @@ from .const import ( UpdateChannel, ) from .coresys import CoreSys, CoreSysAttributes -from .exceptions import ( - CodeNotaryError, - CodeNotaryUntrusted, - UpdaterError, - UpdaterJobError, -) +from .exceptions import UpdaterError, UpdaterJobError from .jobs.decorator import Job, JobCondition -from .utils.codenotary import calc_checksum from .utils.common import FileConfiguration from .validate import SCHEMA_UPDATER_CONFIG @@ -289,19 +283,6 @@ class Updater(FileConfiguration, CoreSysAttributes): self.sys_bus.remove_listener(self._connectivity_listener) self._connectivity_listener = None - # Validate - try: - await self.sys_security.verify_own_content(calc_checksum(data)) - except CodeNotaryUntrusted as err: - raise UpdaterError( - "Content-Trust is broken for the version file fetch!", _LOGGER.critical - ) from err - except CodeNotaryError as err: - raise UpdaterError( - f"CodeNotary error while processing version fetch: {err!s}", - _LOGGER.error, - ) from err - # Parse data try: data = json.loads(data) diff --git a/supervisor/utils/codenotary.py b/supervisor/utils/codenotary.py deleted file mode 100644 index 927d60acd..000000000 --- a/supervisor/utils/codenotary.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Small wrapper for CodeNotary.""" - -from __future__ import annotations - -import asyncio -import hashlib -import json -import logging -from pathlib import Path -import shlex -from typing import Final - -from dirhash import dirhash - -from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted -from . import clean_env - -_LOGGER: logging.Logger = logging.getLogger(__name__) - -_CAS_CMD: str = ( - "cas authenticate --signerID {signer} --silent --output json --hash {sum}" -) -_CACHE: set[tuple[str, str]] = set() - - -_ATTR_ERROR: Final = "error" -_ATTR_STATUS: Final = "status" -_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue" - - -def calc_checksum(data: str | bytes) -> str: - """Generate checksum for CodeNotary.""" - if isinstance(data, str): - return hashlib.sha256(data.encode()).hexdigest() - return hashlib.sha256(data).hexdigest() - - -def calc_checksum_path_sourcecode(folder: Path) -> str: - """Calculate checksum for a path source code. - - Need catch OSError. - """ - return dirhash(folder.as_posix(), "sha256", match=["*.py"]) - - -# pylint: disable=unreachable -async def cas_validate( - signer: str, - checksum: str, -) -> None: - """Validate data against CodeNotary.""" - return - if (checksum, signer) in _CACHE: - return - - # Generate command for request - command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum)) - - # Request notary authorization - _LOGGER.debug("Send cas command: %s", command) - try: - proc = await asyncio.create_subprocess_exec( - *command, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=clean_env(), - ) - - async with asyncio.timeout(15): - data, error = await proc.communicate() - except TimeoutError: - raise CodeNotaryBackendError( - "Timeout while processing CodeNotary", _LOGGER.warning - ) from None - except OSError as err: - raise CodeNotaryError( - f"CodeNotary fatal error: {err!s}", _LOGGER.critical - ) from err - - # Check if Notarized - if proc.returncode != 0 and not data: - if error: - try: - error = error.decode("utf-8") - except UnicodeDecodeError as err: - raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err - if "not notarized" in error: - raise CodeNotaryUntrusted() - else: - error = _FALLBACK_ERROR - raise CodeNotaryBackendError(error, _LOGGER.warning) - - # Parse data - try: - data_json = json.loads(data) - _LOGGER.debug("CodeNotary response with: %s", data_json) - except (json.JSONDecodeError, UnicodeDecodeError) as err: - raise CodeNotaryError( - f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error - ) from err - - if _ATTR_ERROR in data_json: - raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning) - - if data_json[_ATTR_STATUS] == 0: - _CACHE.add((checksum, signer)) - else: - raise CodeNotaryUntrusted() diff --git a/supervisor/validate.py b/supervisor/validate.py index 8a20ff9b0..1030d3e06 100644 --- a/supervisor/validate.py +++ b/supervisor/validate.py @@ -12,7 +12,6 @@ from .const import ( ATTR_AUTO_UPDATE, ATTR_CHANNEL, ATTR_CLI, - ATTR_CONTENT_TRUST, ATTR_COUNTRY, ATTR_DEBUG, ATTR_DEBUG_BLOCK, @@ -229,7 +228,6 @@ SCHEMA_INGRESS_CONFIG = vol.Schema( # pylint: disable=no-value-for-parameter SCHEMA_SECURITY_CONFIG = vol.Schema( { - vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(), vol.Optional(ATTR_PWNED, default=True): vol.Boolean(), vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(), }, diff --git a/tests/api/test_security.py b/tests/api/test_security.py index 2ef5d5313..d0d3f28a9 100644 --- a/tests/api/test_security.py +++ b/tests/api/test_security.py @@ -17,16 +17,6 @@ async def test_api_security_options_force_security(api_client, coresys: CoreSys) assert coresys.security.force -@pytest.mark.asyncio -async def test_api_security_options_content_trust(api_client, coresys: CoreSys): - """Test security options content trust.""" - assert coresys.security.content_trust - - await api_client.post("/security/options", json={"content_trust": False}) - - assert not coresys.security.content_trust - - @pytest.mark.asyncio async def test_api_security_options_pwned(api_client, coresys: CoreSys): """Test security options pwned.""" @@ -41,11 +31,8 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys): async def test_api_integrity_check( api_client, coresys: CoreSys, supervisor_internet: AsyncMock ): - """Test security integrity check.""" - coresys.security.content_trust = False - + """Test security integrity check - now deprecated.""" resp = await api_client.post("/security/integrity") - result = await resp.json() - assert result["data"]["core"] == "untested" - assert result["data"]["supervisor"] == "untested" + # CodeNotary integrity check has been removed, should return 410 Gone + assert resp.status == 410 diff --git a/tests/docker/test_interface.py b/tests/docker/test_interface.py index 321d1db0f..8bdee78bb 100644 --- a/tests/docker/test_interface.py +++ b/tests/docker/test_interface.py @@ -31,15 +31,6 @@ from supervisor.jobs import JobSchedulerOptions, SupervisorJob from tests.common import load_json_fixture -@pytest.fixture(autouse=True) -def mock_verify_content(coresys: CoreSys): - """Mock verify_content utility during tests.""" - with patch.object( - coresys.security, "verify_content", return_value=None - ) as verify_content: - yield verify_content - - @pytest.mark.parametrize( "cpu_arch, platform", [ diff --git a/tests/misc/test_tasks.py b/tests/misc/test_tasks.py index ff787b9be..32a0e8445 100644 --- a/tests/misc/test_tasks.py +++ b/tests/misc/test_tasks.py @@ -181,7 +181,6 @@ async def test_reload_updater_triggers_supervisor_update( """Test an updater reload triggers a supervisor update if there is one.""" coresys.hardware.disk.get_disk_free_space = lambda x: 5000 await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with ( patch.object( diff --git a/tests/plugins/test_plugin_base.py b/tests/plugins/test_plugin_base.py index c47144a5b..0e1ba0b5b 100644 --- a/tests/plugins/test_plugin_base.py +++ b/tests/plugins/test_plugin_base.py @@ -17,7 +17,6 @@ from supervisor.exceptions import ( AudioJobError, CliError, CliJobError, - CodeNotaryUntrusted, CoreDNSError, CoreDNSJobError, DockerError, @@ -337,14 +336,12 @@ async def test_repair_failed( patch.object( DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64) ), - patch( - "supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted - ), + patch.object(DockerInterface, "install", side_effect=DockerError), ): await plugin.repair() capture_exception.assert_called_once() - assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted) + assert check_exception_chain(capture_exception.call_args[0][0], DockerError) @pytest.mark.parametrize( diff --git a/tests/resolution/check/test_check.py b/tests/resolution/check/test_check.py index c582f732e..9793eb4dd 100644 --- a/tests/resolution/check/test_check.py +++ b/tests/resolution/check/test_check.py @@ -51,7 +51,6 @@ async def test_if_check_make_issue(coresys: CoreSys): """Test check for setup.""" free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() @@ -63,7 +62,6 @@ async def test_if_check_cleanup_issue(coresys: CoreSys): """Test check for setup.""" free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM) await coresys.core.set_state(CoreState.RUNNING) - coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() diff --git a/tests/resolution/check/test_check_supervisor_trust.py b/tests/resolution/check/test_check_supervisor_trust.py deleted file mode 100644 index 54410cef1..000000000 --- a/tests/resolution/check/test_check_supervisor_trust.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Test Check Supervisor trust.""" - -# pylint: disable=import-error,protected-access -from unittest.mock import AsyncMock, patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust -from supervisor.resolution.const import IssueType, UnhealthyReason - - -async def test_base(coresys: CoreSys): - """Test check basics.""" - supervisor_trust = CheckSupervisorTrust(coresys) - assert supervisor_trust.slug == "supervisor_trust" - assert supervisor_trust.enabled - - -async def test_check(coresys: CoreSys): - """Test check.""" - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert len(coresys.resolution.issues) == 0 - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - coresys.supervisor.check_trust = AsyncMock(return_value=None) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - assert len(coresys.resolution.issues) == 0 - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - await supervisor_trust.run_check() - assert coresys.supervisor.check_trust.called - - assert len(coresys.resolution.issues) == 1 - assert coresys.resolution.issues[-1].type == IssueType.TRUST - - assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy - - -async def test_approve(coresys: CoreSys): - """Test check.""" - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - assert await supervisor_trust.approve_check() - - coresys.supervisor.check_trust = AsyncMock(return_value=None) - assert not await supervisor_trust.approve_check() - - -async def test_with_global_disable(coresys: CoreSys, caplog): - """Test when pwned is globally disabled.""" - coresys.security.content_trust = False - supervisor_trust = CheckSupervisorTrust(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert len(coresys.resolution.issues) == 0 - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await supervisor_trust.run_check() - assert not coresys.security.verify_own_content.called - assert ( - "Skipping supervisor_trust, content_trust is globally disabled" in caplog.text - ) - - -async def test_did_run(coresys: CoreSys): - """Test that the check ran as expected.""" - supervisor_trust = CheckSupervisorTrust(coresys) - should_run = supervisor_trust.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check", - return_value=None, - ) as check: - for state in should_run: - await coresys.core.set_state(state) - await supervisor_trust() - check.assert_called_once() - check.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await supervisor_trust() - check.assert_not_called() - check.reset_mock() diff --git a/tests/resolution/evaluation/test_evaluate_content_trust.py b/tests/resolution/evaluation/test_evaluate_content_trust.py deleted file mode 100644 index 8dd67a9af..000000000 --- a/tests/resolution/evaluation/test_evaluate_content_trust.py +++ /dev/null @@ -1,46 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -from unittest.mock import patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust - - -async def test_evaluation(coresys: CoreSys): - """Test evaluation.""" - job_conditions = EvaluateContentTrust(coresys) - await coresys.core.set_state(CoreState.SETUP) - - await job_conditions() - assert job_conditions.reason not in coresys.resolution.unsupported - - coresys.security.content_trust = False - await job_conditions() - assert job_conditions.reason in coresys.resolution.unsupported - - -async def test_did_run(coresys: CoreSys): - """Test that the evaluation ran as expected.""" - job_conditions = EvaluateContentTrust(coresys) - should_run = job_conditions.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate", - return_value=None, - ) as evaluate: - for state in should_run: - await coresys.core.set_state(state) - await job_conditions() - evaluate.assert_called_once() - evaluate.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await job_conditions() - evaluate.assert_not_called() - evaluate.reset_mock() diff --git a/tests/resolution/evaluation/test_evaluate_source_mods.py b/tests/resolution/evaluation/test_evaluate_source_mods.py deleted file mode 100644 index 084b56baf..000000000 --- a/tests/resolution/evaluation/test_evaluate_source_mods.py +++ /dev/null @@ -1,89 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -import errno -import os -from pathlib import Path -from unittest.mock import AsyncMock, patch - -from supervisor.const import CoreState -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.resolution.const import ContextType, IssueType -from supervisor.resolution.data import Issue -from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods - - -async def test_evaluation(coresys: CoreSys): - """Test evaluation.""" - with patch( - "supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE", - Path(f"{os.getcwd()}/supervisor"), - ): - sourcemods = EvaluateSourceMods(coresys) - await coresys.core.set_state(CoreState.RUNNING) - - assert sourcemods.reason not in coresys.resolution.unsupported - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) - await sourcemods() - assert sourcemods.reason in coresys.resolution.unsupported - - coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError) - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - - coresys.security.verify_own_content = AsyncMock() - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - - -async def test_did_run(coresys: CoreSys): - """Test that the evaluation ran as expected.""" - sourcemods = EvaluateSourceMods(coresys) - should_run = sourcemods.states - should_not_run = [state for state in CoreState if state not in should_run] - assert len(should_run) != 0 - assert len(should_not_run) != 0 - - with patch( - "supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate", - return_value=None, - ) as evaluate: - for state in should_run: - await coresys.core.set_state(state) - await sourcemods() - evaluate.assert_called_once() - evaluate.reset_mock() - - for state in should_not_run: - await coresys.core.set_state(state) - await sourcemods() - evaluate.assert_not_called() - evaluate.reset_mock() - - -async def test_evaluation_error(coresys: CoreSys): - """Test error reading file during evaluation.""" - sourcemods = EvaluateSourceMods(coresys) - await coresys.core.set_state(CoreState.RUNNING) - corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM) - - assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs not in coresys.resolution.issues - - with patch( - "supervisor.utils.codenotary.dirhash", - side_effect=(err := OSError()), - ): - err.errno = errno.EBUSY - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs in coresys.resolution.issues - assert coresys.core.healthy is True - - coresys.resolution.dismiss_issue(corrupt_fs) - err.errno = errno.EBADMSG - await sourcemods() - assert sourcemods.reason not in coresys.resolution.unsupported - assert corrupt_fs in coresys.resolution.issues - assert coresys.core.healthy is False diff --git a/tests/resolution/fixup/test_system_execute_integrity.py b/tests/resolution/fixup/test_system_execute_integrity.py deleted file mode 100644 index 766345c6c..000000000 --- a/tests/resolution/fixup/test_system_execute_integrity.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test evaluation base.""" - -# pylint: disable=import-error,protected-access -from datetime import timedelta -from unittest.mock import AsyncMock - -import time_machine - -from supervisor.coresys import CoreSys -from supervisor.resolution.const import ContextType, IssueType, SuggestionType -from supervisor.resolution.data import Issue, Suggestion -from supervisor.resolution.fixups.system_execute_integrity import ( - FixupSystemExecuteIntegrity, -) -from supervisor.security.const import ContentTrustResult, IntegrityResult -from supervisor.utils.dt import utcnow - - -async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock): - """Test fixup.""" - system_execute_integrity = FixupSystemExecuteIntegrity(coresys) - - assert system_execute_integrity.auto - - coresys.resolution.add_suggestion( - Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) - ) - coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) - - coresys.security.integrity_check = AsyncMock( - return_value=IntegrityResult( - ContentTrustResult.PASS, - ContentTrustResult.PASS, - {"audio": ContentTrustResult.PASS}, - ) - ) - - await system_execute_integrity() - - assert coresys.security.integrity_check.called - assert len(coresys.resolution.suggestions) == 0 - assert len(coresys.resolution.issues) == 0 - - -async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock): - """Test fixup.""" - system_execute_integrity = FixupSystemExecuteIntegrity(coresys) - - assert system_execute_integrity.auto - - coresys.resolution.add_suggestion( - Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) - ) - coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) - - coresys.security.integrity_check = AsyncMock( - return_value=IntegrityResult( - ContentTrustResult.FAILED, - ContentTrustResult.PASS, - {"audio": ContentTrustResult.PASS}, - ) - ) - - with time_machine.travel(utcnow() + timedelta(hours=24)): - await system_execute_integrity() - - assert coresys.security.integrity_check.called - assert len(coresys.resolution.suggestions) == 1 - assert len(coresys.resolution.issues) == 1 diff --git a/tests/resolution/test_evaluation.py b/tests/resolution/test_evaluation.py index 7e0f0f23d..d09672177 100644 --- a/tests/resolution/test_evaluation.py +++ b/tests/resolution/test_evaluation.py @@ -1,21 +1,15 @@ """Test evaluations.""" -from unittest.mock import Mock, patch +from unittest.mock import Mock from supervisor.const import CoreState from supervisor.coresys import CoreSys -from supervisor.utils import check_exception_chain async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock): """Test error while evaluating system.""" await coresys.core.set_state(CoreState.RUNNING) - with patch( - "supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode", - side_effect=RuntimeError, - ): - await coresys.resolution.evaluate.evaluate_system() + await coresys.resolution.evaluate.evaluate_system() - capture_exception.assert_called_once() - assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError) + capture_exception.assert_not_called() diff --git a/tests/security/test_module.py b/tests/security/test_module.py deleted file mode 100644 index ece486055..000000000 --- a/tests/security/test_module.py +++ /dev/null @@ -1,127 +0,0 @@ -"""Testing handling with Security.""" - -from unittest.mock import AsyncMock, patch - -import pytest - -from supervisor.coresys import CoreSys -from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted -from supervisor.security.const import ContentTrustResult - - -async def test_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - with patch( - "supervisor.security.module.cas_validate", AsyncMock() - ) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with( - "notary@home-assistant.io", "ffffffffffffff" - ) - - -async def test_disabled_content_trust(coresys: CoreSys): - """Test Content-Trust.""" - coresys.security.content_trust = False - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert not cas_validate.called - - with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: - await coresys.security.verify_own_content("ffffffffffffff") - assert not cas_validate.called - - -async def test_force_content_trust(coresys: CoreSys): - """Force Content-Trust tests.""" - - with patch( - "supervisor.security.module.cas_validate", - AsyncMock(side_effect=CodeNotaryError), - ) as cas_validate: - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - assert cas_validate.called - cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") - - coresys.security.force = True - - with ( - patch( - "supervisor.security.module.cas_validate", - AsyncMock(side_effect=CodeNotaryError), - ) as cas_validate, - pytest.raises(CodeNotaryError), - ): - await coresys.security.verify_content("test@mail.com", "ffffffffffffff") - - -async def test_integrity_check_disabled(coresys: CoreSys): - """Test integrity check with disabled content trust.""" - coresys.security.content_trust = False - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.UNTESTED - assert result.supervisor == ContentTrustResult.UNTESTED - - -async def test_integrity_check(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust.""" - coresys.homeassistant.core.check_trust = AsyncMock() - coresys.supervisor.check_trust = AsyncMock() - install_addon_ssh.check_trust = AsyncMock() - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.PASS - assert result.supervisor == ContentTrustResult.PASS - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS - - -async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust issues.""" - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.ERROR - assert result.supervisor == ContentTrustResult.ERROR - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR - - -async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust failed.""" - coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) - coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) - install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError) - install_addon_ssh.data["codenotary"] = "test@example.com" - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.FAILED - assert result.supervisor == ContentTrustResult.FAILED - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED - - -async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh): - """Test integrity check with content trust but no signed add-ons.""" - coresys.homeassistant.core.check_trust = AsyncMock() - coresys.supervisor.check_trust = AsyncMock() - - result = await coresys.security.integrity_check.__wrapped__(coresys.security) - - assert result.core == ContentTrustResult.PASS - assert result.supervisor == ContentTrustResult.PASS - assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED diff --git a/tests/test_updater.py b/tests/test_updater.py index b1cf21bb7..6fcd5e5d2 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -86,10 +86,9 @@ async def test_os_update_path( """Test OS upgrade path across major versions.""" coresys.os._board = "rpi4" # pylint: disable=protected-access coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access - with patch.object(type(coresys.security), "verify_own_content"): - await coresys.updater.fetch_data() + await coresys.updater.fetch_data() - assert coresys.updater.version_hassos == AwesomeVersion(expected) + assert coresys.updater.version_hassos == AwesomeVersion(expected) @pytest.mark.usefixtures("no_job_throttle") @@ -105,7 +104,6 @@ async def test_delayed_fetch_for_connectivity( load_binary_fixture("version_stable.json") ) coresys.websession.head = AsyncMock() - coresys.security.verify_own_content = AsyncMock() # Network connectivity change causes a series of async tasks to eventually do a version fetch # Rather then use some kind of sleep loop, set up listener for start of fetch data job diff --git a/tests/utils/test_codenotary.py b/tests/utils/test_codenotary.py deleted file mode 100644 index fa1b6b5a7..000000000 --- a/tests/utils/test_codenotary.py +++ /dev/null @@ -1,128 +0,0 @@ -"""Test CodeNotary.""" - -from __future__ import annotations - -from dataclasses import dataclass -from unittest.mock import AsyncMock, Mock, patch - -import pytest - -from supervisor.exceptions import ( - CodeNotaryBackendError, - CodeNotaryError, - CodeNotaryUntrusted, -) -from supervisor.utils.codenotary import calc_checksum, cas_validate - -pytest.skip("code notary has been disabled due to issues", allow_module_level=True) - - -@dataclass -class SubprocessResponse: - """Class for specifying subprocess exec response.""" - - returncode: int = 0 - data: str = "" - error: str | None = None - exception: Exception | None = None - - -@pytest.fixture(name="subprocess_exec") -def fixture_subprocess_exec(request): - """Mock subprocess exec with specific return.""" - response = request.param - if response.exception: - communicate_return = AsyncMock(side_effect=response.exception) - else: - communicate_return = AsyncMock(return_value=(response.data, response.error)) - - exec_return = Mock(returncode=response.returncode, communicate=communicate_return) - - with patch( - "supervisor.utils.codenotary.asyncio.create_subprocess_exec", - return_value=exec_return, - ) as subprocess_exec: - yield subprocess_exec - - -def test_checksum_calc(): - """Calc Checkusm as test.""" - assert calc_checksum("test") == calc_checksum(b"test") - assert ( - calc_checksum("test") - == "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" - ) - - -async def test_valid_checksum(): - """Test a valid autorization.""" - await cas_validate( - "notary@home-assistant.io", - "4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0", - ) - - -async def test_invalid_checksum(): - """Test a invalid autorization.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [SubprocessResponse(returncode=1, error=b"x is not notarized")], -) -async def test_not_notarized_error(subprocess_exec): - """Test received a not notarized error response from command.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [ - SubprocessResponse(returncode=1, error=b"test"), - SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'), - SubprocessResponse(returncode=1, error="test".encode("utf-16")), - ], - indirect=True, -) -async def test_cas_backend_error(subprocess_exec): - """Test backend error executing cas command.""" - with pytest.raises(CodeNotaryBackendError): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", - [SubprocessResponse(returncode=0, data='{"status":1}')], - indirect=True, -) -async def test_cas_notarized_untrusted(subprocess_exec): - """Test cas found notarized but untrusted content.""" - with pytest.raises(CodeNotaryUntrusted): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - ) - - -@pytest.mark.parametrize( - "subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True -) -async def test_cas_exec_os_error(subprocess_exec): - """Test os error attempting to execute cas command.""" - with pytest.raises(CodeNotaryError): - await cas_validate( - "notary@home-assistant.io", - "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", - )