mirror of
				https://github.com/home-assistant/supervisor.git
				synced 2025-10-25 11:39:33 +00:00 
			
		
		
		
	Compare commits
	
		
			14 Commits
		
	
	
		
			2025.10.0
			...
			remove-cod
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 8f5e4a7028 | ||
|   | 2545d6b841 | ||
|   | ba24bbd0d0 | ||
|   | 418a8cc86a | ||
|   | 7dc6be51c5 | ||
|   | d3e7df34d2 | ||
|   | 4b24255fa4 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 559b6982a3 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 301362e9e5 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | fc928d294c | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | f42aeb4937 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | fd21886de9 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | e4bb415e30 | ||
| ![dependabot[bot]](/assets/img/avatar_default.png)  | 622dda5382 | 
							
								
								
									
										2
									
								
								.github/workflows/stale.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.github/workflows/stale.yml
									
									
									
									
										vendored
									
									
								
							| @@ -9,7 +9,7 @@ jobs: | ||||
|   stale: | ||||
|     runs-on: ubuntu-latest | ||||
|     steps: | ||||
|       - uses: actions/stale@3a9db7e6a41a89f618792c92c0e97cc736e1b13f # v10.0.0 | ||||
|       - uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0 | ||||
|         with: | ||||
|           repo-token: ${{ secrets.GITHUB_TOKEN }} | ||||
|           days-before-stale: 30 | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| aiodns==3.5.0 | ||||
| aiohttp==3.12.15 | ||||
| aiohttp==3.13.0 | ||||
| atomicwrites-homeassistant==1.4.1 | ||||
| attrs==25.3.0 | ||||
| attrs==25.4.0 | ||||
| awesomeversion==25.8.0 | ||||
| blockbuster==1.5.25 | ||||
| brotli==1.1.0 | ||||
| @@ -23,8 +23,8 @@ pyudev==0.24.3 | ||||
| PyYAML==6.0.3 | ||||
| requests==2.32.5 | ||||
| securetar==2025.2.1 | ||||
| sentry-sdk==2.39.0 | ||||
| sentry-sdk==2.40.0 | ||||
| setuptools==80.9.0 | ||||
| voluptuous==0.15.2 | ||||
| dbus-fast==2.44.3 | ||||
| dbus-fast==2.44.5 | ||||
| zlib-fast==0.2.1 | ||||
|   | ||||
| @@ -2,13 +2,13 @@ astroid==3.3.11 | ||||
| coverage==7.10.7 | ||||
| mypy==1.18.2 | ||||
| pre-commit==4.3.0 | ||||
| pylint==3.3.8 | ||||
| pylint==3.3.9 | ||||
| pytest-aiohttp==1.1.0 | ||||
| pytest-asyncio==0.25.2 | ||||
| pytest-cov==7.0.0 | ||||
| pytest-timeout==2.4.0 | ||||
| pytest==8.4.2 | ||||
| ruff==0.13.2 | ||||
| ruff==0.13.3 | ||||
| time-machine==2.19.0 | ||||
| types-docker==7.1.0.20250916 | ||||
| types-pyyaml==6.0.12.20250915 | ||||
|   | ||||
| @@ -1510,13 +1510,6 @@ class Addon(AddonModel): | ||||
|         _LOGGER.info("Finished restore for add-on %s", self.slug) | ||||
|         return wait_for_start | ||||
|  | ||||
|     def check_trust(self) -> Awaitable[None]: | ||||
|         """Calculate Addon docker content trust. | ||||
|  | ||||
|         Return Coroutine. | ||||
|         """ | ||||
|         return self.instance.check_trust() | ||||
|  | ||||
|     @Job( | ||||
|         name="addon_restart_after_problem", | ||||
|         throttle_period=WATCHDOG_THROTTLE_PERIOD, | ||||
|   | ||||
| @@ -206,6 +206,12 @@ def _warn_addon_config(config: dict[str, Any]): | ||||
|             name, | ||||
|         ) | ||||
|  | ||||
|     if ATTR_CODENOTARY in config: | ||||
|         _LOGGER.warning( | ||||
|             "Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.", | ||||
|             name, | ||||
|         ) | ||||
|  | ||||
|     return config | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -1,19 +1,14 @@ | ||||
| """Init file for Supervisor Security RESTful API.""" | ||||
|  | ||||
| import asyncio | ||||
| import logging | ||||
| from typing import Any | ||||
|  | ||||
| from aiohttp import web | ||||
| import attr | ||||
| import voluptuous as vol | ||||
|  | ||||
| from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED | ||||
| from ..coresys import CoreSysAttributes | ||||
| from .utils import api_process, api_validate | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
|  | ||||
| # pylint: disable=no-value-for-parameter | ||||
| SCHEMA_OPTIONS = vol.Schema( | ||||
|     { | ||||
| @@ -54,6 +49,9 @@ class APISecurity(CoreSysAttributes): | ||||
|  | ||||
|     @api_process | ||||
|     async def integrity_check(self, request: web.Request) -> dict[str, Any]: | ||||
|         """Run backend integrity check.""" | ||||
|         result = await asyncio.shield(self.sys_security.integrity_check()) | ||||
|         return attr.asdict(result) | ||||
|         """Run backend integrity check. | ||||
|  | ||||
|         CodeNotary integrity checking has been removed. This endpoint now returns | ||||
|         an error indicating the feature is currently non-functional. | ||||
|         """ | ||||
|         return {"error": "No integrity checking available"} | ||||
|   | ||||
| @@ -835,16 +835,6 @@ class DockerAddon(DockerInterface): | ||||
|         ): | ||||
|             self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue) | ||||
|  | ||||
|     async def _validate_trust(self, image_id: str) -> None: | ||||
|         """Validate trust of content.""" | ||||
|         if not self.addon.signed: | ||||
|             return | ||||
|  | ||||
|         checksum = image_id.partition(":")[2] | ||||
|         return await self.sys_security.verify_content( | ||||
|             cast(str, self.addon.codenotary), checksum | ||||
|         ) | ||||
|  | ||||
|     @Job( | ||||
|         name="docker_addon_hardware_events", | ||||
|         conditions=[JobCondition.OS_AGENT], | ||||
|   | ||||
| @@ -5,7 +5,7 @@ from ipaddress import IPv4Address | ||||
| import logging | ||||
| import re | ||||
|  | ||||
| from awesomeversion import AwesomeVersion, AwesomeVersionCompareException | ||||
| from awesomeversion import AwesomeVersion | ||||
| from docker.types import Mount | ||||
|  | ||||
| from ..const import LABEL_MACHINE | ||||
| @@ -244,13 +244,3 @@ class DockerHomeAssistant(DockerInterface): | ||||
|             self.image, | ||||
|             self.sys_homeassistant.version, | ||||
|         ) | ||||
|  | ||||
|     async def _validate_trust(self, image_id: str) -> None: | ||||
|         """Validate trust of content.""" | ||||
|         try: | ||||
|             if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST: | ||||
|                 return | ||||
|         except AwesomeVersionCompareException: | ||||
|             return | ||||
|  | ||||
|         await super()._validate_trust(image_id) | ||||
|   | ||||
| @@ -31,15 +31,12 @@ from ..const import ( | ||||
| ) | ||||
| from ..coresys import CoreSys | ||||
| from ..exceptions import ( | ||||
|     CodeNotaryError, | ||||
|     CodeNotaryUntrusted, | ||||
|     DockerAPIError, | ||||
|     DockerError, | ||||
|     DockerJobError, | ||||
|     DockerLogOutOfOrder, | ||||
|     DockerNotFound, | ||||
|     DockerRequestError, | ||||
|     DockerTrustError, | ||||
| ) | ||||
| from ..jobs import SupervisorJob | ||||
| from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency | ||||
| @@ -374,17 +371,7 @@ class DockerInterface(JobGroup, ABC): | ||||
|                 platform=MAP_ARCH[image_arch], | ||||
|             ) | ||||
|  | ||||
|             # Validate content | ||||
|             try: | ||||
|                 await self._validate_trust(cast(str, docker_image.id)) | ||||
|             except CodeNotaryError: | ||||
|                 with suppress(docker.errors.DockerException): | ||||
|                     await self.sys_run_in_executor( | ||||
|                         self.sys_docker.images.remove, | ||||
|                         image=f"{image}:{version!s}", | ||||
|                         force=True, | ||||
|                     ) | ||||
|                 raise | ||||
|             # CodeNotary content trust validation has been removed | ||||
|  | ||||
|             # Tag latest | ||||
|             if latest: | ||||
| @@ -411,16 +398,6 @@ class DockerInterface(JobGroup, ABC): | ||||
|             raise DockerError( | ||||
|                 f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error | ||||
|             ) from err | ||||
|         except CodeNotaryUntrusted as err: | ||||
|             raise DockerTrustError( | ||||
|                 f"Pulled image {image}:{version!s} failed on content-trust verification!", | ||||
|                 _LOGGER.critical, | ||||
|             ) from err | ||||
|         except CodeNotaryError as err: | ||||
|             raise DockerTrustError( | ||||
|                 f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}", | ||||
|                 _LOGGER.error, | ||||
|             ) from err | ||||
|         finally: | ||||
|             if listener: | ||||
|                 self.sys_bus.remove_listener(listener) | ||||
| @@ -755,24 +732,3 @@ class DockerInterface(JobGroup, ABC): | ||||
|         return self.sys_run_in_executor( | ||||
|             self.sys_docker.container_run_inside, self.name, command | ||||
|         ) | ||||
|  | ||||
|     async def _validate_trust(self, image_id: str) -> None: | ||||
|         """Validate trust of content.""" | ||||
|         checksum = image_id.partition(":")[2] | ||||
|         return await self.sys_security.verify_own_content(checksum) | ||||
|  | ||||
|     @Job( | ||||
|         name="docker_interface_check_trust", | ||||
|         on_condition=DockerJobError, | ||||
|         concurrency=JobConcurrency.GROUP_REJECT, | ||||
|     ) | ||||
|     async def check_trust(self) -> None: | ||||
|         """Check trust of exists Docker image.""" | ||||
|         try: | ||||
|             image = await self.sys_run_in_executor( | ||||
|                 self.sys_docker.images.get, f"{self.image}:{self.version!s}" | ||||
|             ) | ||||
|         except (docker.errors.DockerException, requests.RequestException): | ||||
|             return | ||||
|  | ||||
|         await self._validate_trust(cast(str, image.id)) | ||||
|   | ||||
| @@ -577,21 +577,6 @@ class PwnedConnectivityError(PwnedError): | ||||
|     """Connectivity errors while checking pwned passwords.""" | ||||
|  | ||||
|  | ||||
| # util/codenotary | ||||
|  | ||||
|  | ||||
| class CodeNotaryError(HassioError): | ||||
|     """Error general with CodeNotary.""" | ||||
|  | ||||
|  | ||||
| class CodeNotaryUntrusted(CodeNotaryError): | ||||
|     """Error on untrusted content.""" | ||||
|  | ||||
|  | ||||
| class CodeNotaryBackendError(CodeNotaryError): | ||||
|     """CodeNotary backend error happening.""" | ||||
|  | ||||
|  | ||||
| # util/whoami | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -420,13 +420,6 @@ class HomeAssistantCore(JobGroup): | ||||
|         """ | ||||
|         return self.instance.logs() | ||||
|  | ||||
|     def check_trust(self) -> Awaitable[None]: | ||||
|         """Calculate HomeAssistant docker content trust. | ||||
|  | ||||
|         Return Coroutine. | ||||
|         """ | ||||
|         return self.instance.check_trust() | ||||
|  | ||||
|     async def stats(self) -> DockerStats: | ||||
|         """Return stats of Home Assistant.""" | ||||
|         try: | ||||
|   | ||||
| @@ -76,13 +76,6 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes): | ||||
|         """Return True if a task is in progress.""" | ||||
|         return self.instance.in_progress | ||||
|  | ||||
|     def check_trust(self) -> Awaitable[None]: | ||||
|         """Calculate plugin docker content trust. | ||||
|  | ||||
|         Return Coroutine. | ||||
|         """ | ||||
|         return self.instance.check_trust() | ||||
|  | ||||
|     def logs(self) -> Awaitable[bytes]: | ||||
|         """Get docker plugin logs. | ||||
|  | ||||
|   | ||||
| @@ -1,59 +0,0 @@ | ||||
| """Helpers to check supervisor trust.""" | ||||
|  | ||||
| import logging | ||||
|  | ||||
| from ...const import CoreState | ||||
| from ...coresys import CoreSys | ||||
| from ...exceptions import CodeNotaryError, CodeNotaryUntrusted | ||||
| from ..const import ContextType, IssueType, UnhealthyReason | ||||
| from .base import CheckBase | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| def setup(coresys: CoreSys) -> CheckBase: | ||||
|     """Check setup function.""" | ||||
|     return CheckSupervisorTrust(coresys) | ||||
|  | ||||
|  | ||||
| class CheckSupervisorTrust(CheckBase): | ||||
|     """CheckSystemTrust class for check.""" | ||||
|  | ||||
|     async def run_check(self) -> None: | ||||
|         """Run check if not affected by issue.""" | ||||
|         if not self.sys_security.content_trust: | ||||
|             _LOGGER.warning( | ||||
|                 "Skipping %s, content_trust is globally disabled", self.slug | ||||
|             ) | ||||
|             return | ||||
|  | ||||
|         try: | ||||
|             await self.sys_supervisor.check_trust() | ||||
|         except CodeNotaryUntrusted: | ||||
|             self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED) | ||||
|             self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) | ||||
|         except CodeNotaryError: | ||||
|             pass | ||||
|  | ||||
|     async def approve_check(self, reference: str | None = None) -> bool: | ||||
|         """Approve check if it is affected by issue.""" | ||||
|         try: | ||||
|             await self.sys_supervisor.check_trust() | ||||
|         except CodeNotaryError: | ||||
|             return True | ||||
|         return False | ||||
|  | ||||
|     @property | ||||
|     def issue(self) -> IssueType: | ||||
|         """Return a IssueType enum.""" | ||||
|         return IssueType.TRUST | ||||
|  | ||||
|     @property | ||||
|     def context(self) -> ContextType: | ||||
|         """Return a ContextType enum.""" | ||||
|         return ContextType.SUPERVISOR | ||||
|  | ||||
|     @property | ||||
|     def states(self) -> list[CoreState]: | ||||
|         """Return a list of valid states when this check can run.""" | ||||
|         return [CoreState.RUNNING, CoreState.STARTUP] | ||||
| @@ -1,14 +1,11 @@ | ||||
| """Evaluation class for Content Trust.""" | ||||
|  | ||||
| import errno | ||||
| import logging | ||||
| from pathlib import Path | ||||
|  | ||||
| from ...const import CoreState | ||||
| from ...coresys import CoreSys | ||||
| from ...exceptions import CodeNotaryError, CodeNotaryUntrusted | ||||
| from ...utils.codenotary import calc_checksum_path_sourcecode | ||||
| from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason | ||||
| from ..const import UnsupportedReason | ||||
| from .base import EvaluateBase | ||||
|  | ||||
| _SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor") | ||||
| @@ -44,29 +41,4 @@ class EvaluateSourceMods(EvaluateBase): | ||||
|             _LOGGER.warning("Disabled content-trust, skipping evaluation") | ||||
|             return False | ||||
|  | ||||
|         # Calculate sume of the sourcecode | ||||
|         try: | ||||
|             checksum = await self.sys_run_in_executor( | ||||
|                 calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE | ||||
|             ) | ||||
|         except OSError as err: | ||||
|             if err.errno == errno.EBADMSG: | ||||
|                 self.sys_resolution.add_unhealthy_reason( | ||||
|                     UnhealthyReason.OSERROR_BAD_MESSAGE | ||||
|                 ) | ||||
|  | ||||
|             self.sys_resolution.create_issue( | ||||
|                 IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM | ||||
|             ) | ||||
|             _LOGGER.error("Can't calculate checksum of source code: %s", err) | ||||
|             return False | ||||
|  | ||||
|         # Validate checksum | ||||
|         try: | ||||
|             await self.sys_security.verify_own_content(checksum) | ||||
|         except CodeNotaryUntrusted: | ||||
|             return True | ||||
|         except CodeNotaryError: | ||||
|             pass | ||||
|  | ||||
|         return False | ||||
|   | ||||
| @@ -1,67 +0,0 @@ | ||||
| """Helpers to check and fix issues with free space.""" | ||||
|  | ||||
| from datetime import timedelta | ||||
| import logging | ||||
|  | ||||
| from ...coresys import CoreSys | ||||
| from ...exceptions import ResolutionFixupError, ResolutionFixupJobError | ||||
| from ...jobs.const import JobCondition, JobThrottle | ||||
| from ...jobs.decorator import Job | ||||
| from ...security.const import ContentTrustResult | ||||
| from ..const import ContextType, IssueType, SuggestionType | ||||
| from .base import FixupBase | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| def setup(coresys: CoreSys) -> FixupBase: | ||||
|     """Check setup function.""" | ||||
|     return FixupSystemExecuteIntegrity(coresys) | ||||
|  | ||||
|  | ||||
| class FixupSystemExecuteIntegrity(FixupBase): | ||||
|     """Storage class for fixup.""" | ||||
|  | ||||
|     @Job( | ||||
|         name="fixup_system_execute_integrity_process", | ||||
|         conditions=[JobCondition.INTERNET_SYSTEM], | ||||
|         on_condition=ResolutionFixupJobError, | ||||
|         throttle_period=timedelta(hours=8), | ||||
|         throttle=JobThrottle.THROTTLE, | ||||
|     ) | ||||
|     async def process_fixup(self, reference: str | None = None) -> None: | ||||
|         """Initialize the fixup class.""" | ||||
|         result = await self.sys_security.integrity_check() | ||||
|  | ||||
|         if ContentTrustResult.FAILED in (result.core, result.supervisor): | ||||
|             raise ResolutionFixupError() | ||||
|  | ||||
|         for plugin in result.plugins: | ||||
|             if plugin != ContentTrustResult.FAILED: | ||||
|                 continue | ||||
|             raise ResolutionFixupError() | ||||
|  | ||||
|         for addon in result.addons: | ||||
|             if addon != ContentTrustResult.FAILED: | ||||
|                 continue | ||||
|             raise ResolutionFixupError() | ||||
|  | ||||
|     @property | ||||
|     def suggestion(self) -> SuggestionType: | ||||
|         """Return a SuggestionType enum.""" | ||||
|         return SuggestionType.EXECUTE_INTEGRITY | ||||
|  | ||||
|     @property | ||||
|     def context(self) -> ContextType: | ||||
|         """Return a ContextType enum.""" | ||||
|         return ContextType.SYSTEM | ||||
|  | ||||
|     @property | ||||
|     def issues(self) -> list[IssueType]: | ||||
|         """Return a IssueType enum list.""" | ||||
|         return [IssueType.TRUST] | ||||
|  | ||||
|     @property | ||||
|     def auto(self) -> bool: | ||||
|         """Return if a fixup can be apply as auto fix.""" | ||||
|         return True | ||||
| @@ -11,20 +11,10 @@ from ..const import ( | ||||
|     FILE_HASSIO_SECURITY, | ||||
| ) | ||||
| from ..coresys import CoreSys, CoreSysAttributes | ||||
| from ..exceptions import ( | ||||
|     CodeNotaryError, | ||||
|     CodeNotaryUntrusted, | ||||
|     PwnedError, | ||||
|     SecurityJobError, | ||||
| ) | ||||
| from ..jobs.const import JobConcurrency | ||||
| from ..jobs.decorator import Job, JobCondition | ||||
| from ..resolution.const import ContextType, IssueType, SuggestionType | ||||
| from ..utils.codenotary import cas_validate | ||||
| from ..exceptions import PwnedError | ||||
| from ..utils.common import FileConfiguration | ||||
| from ..utils.pwned import check_pwned_password | ||||
| from ..validate import SCHEMA_SECURITY_CONFIG | ||||
| from .const import ContentTrustResult, IntegrityResult | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
|  | ||||
| @@ -67,30 +57,6 @@ class Security(FileConfiguration, CoreSysAttributes): | ||||
|         """Set pwned is enabled/disabled.""" | ||||
|         self._data[ATTR_PWNED] = value | ||||
|  | ||||
|     async def verify_content(self, signer: str, checksum: str) -> None: | ||||
|         """Verify content on CAS.""" | ||||
|         if not self.content_trust: | ||||
|             _LOGGER.warning("Disabled content-trust, skip validation") | ||||
|             return | ||||
|  | ||||
|         try: | ||||
|             await cas_validate(signer, checksum) | ||||
|         except CodeNotaryUntrusted: | ||||
|             raise | ||||
|         except CodeNotaryError: | ||||
|             if self.force: | ||||
|                 raise | ||||
|             self.sys_resolution.create_issue( | ||||
|                 IssueType.TRUST, | ||||
|                 ContextType.SYSTEM, | ||||
|                 suggestions=[SuggestionType.EXECUTE_INTEGRITY], | ||||
|             ) | ||||
|             return | ||||
|  | ||||
|     async def verify_own_content(self, checksum: str) -> None: | ||||
|         """Verify content from HA org.""" | ||||
|         return await self.verify_content("notary@home-assistant.io", checksum) | ||||
|  | ||||
|     async def verify_secret(self, pwned_hash: str) -> None: | ||||
|         """Verify pwned state of a secret.""" | ||||
|         if not self.pwned: | ||||
| @@ -103,73 +69,3 @@ class Security(FileConfiguration, CoreSysAttributes): | ||||
|             if self.force: | ||||
|                 raise | ||||
|             return | ||||
|  | ||||
|     @Job( | ||||
|         name="security_manager_integrity_check", | ||||
|         conditions=[JobCondition.INTERNET_SYSTEM], | ||||
|         on_condition=SecurityJobError, | ||||
|         concurrency=JobConcurrency.REJECT, | ||||
|     ) | ||||
|     async def integrity_check(self) -> IntegrityResult: | ||||
|         """Run a full system integrity check of the platform. | ||||
|  | ||||
|         We only allow to install trusted content. | ||||
|         This is a out of the band manual check. | ||||
|         """ | ||||
|         result: IntegrityResult = IntegrityResult() | ||||
|         if not self.content_trust: | ||||
|             _LOGGER.warning( | ||||
|                 "Skipping integrity check, content_trust is globally disabled" | ||||
|             ) | ||||
|             return result | ||||
|  | ||||
|         # Supervisor | ||||
|         try: | ||||
|             await self.sys_supervisor.check_trust() | ||||
|             result.supervisor = ContentTrustResult.PASS | ||||
|         except CodeNotaryUntrusted: | ||||
|             result.supervisor = ContentTrustResult.ERROR | ||||
|             self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) | ||||
|         except CodeNotaryError: | ||||
|             result.supervisor = ContentTrustResult.FAILED | ||||
|  | ||||
|         # Core | ||||
|         try: | ||||
|             await self.sys_homeassistant.core.check_trust() | ||||
|             result.core = ContentTrustResult.PASS | ||||
|         except CodeNotaryUntrusted: | ||||
|             result.core = ContentTrustResult.ERROR | ||||
|             self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) | ||||
|         except CodeNotaryError: | ||||
|             result.core = ContentTrustResult.FAILED | ||||
|  | ||||
|         # Plugins | ||||
|         for plugin in self.sys_plugins.all_plugins: | ||||
|             try: | ||||
|                 await plugin.check_trust() | ||||
|                 result.plugins[plugin.slug] = ContentTrustResult.PASS | ||||
|             except CodeNotaryUntrusted: | ||||
|                 result.plugins[plugin.slug] = ContentTrustResult.ERROR | ||||
|                 self.sys_resolution.create_issue( | ||||
|                     IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug | ||||
|                 ) | ||||
|             except CodeNotaryError: | ||||
|                 result.plugins[plugin.slug] = ContentTrustResult.FAILED | ||||
|  | ||||
|         # Add-ons | ||||
|         for addon in self.sys_addons.installed: | ||||
|             if not addon.signed: | ||||
|                 result.addons[addon.slug] = ContentTrustResult.UNTESTED | ||||
|                 continue | ||||
|             try: | ||||
|                 await addon.check_trust() | ||||
|                 result.addons[addon.slug] = ContentTrustResult.PASS | ||||
|             except CodeNotaryUntrusted: | ||||
|                 result.addons[addon.slug] = ContentTrustResult.ERROR | ||||
|                 self.sys_resolution.create_issue( | ||||
|                     IssueType.TRUST, ContextType.ADDON, reference=addon.slug | ||||
|                 ) | ||||
|             except CodeNotaryError: | ||||
|                 result.addons[addon.slug] = ContentTrustResult.FAILED | ||||
|  | ||||
|         return result | ||||
|   | ||||
| @@ -23,8 +23,6 @@ from .coresys import CoreSys, CoreSysAttributes | ||||
| from .docker.stats import DockerStats | ||||
| from .docker.supervisor import DockerSupervisor | ||||
| from .exceptions import ( | ||||
|     CodeNotaryError, | ||||
|     CodeNotaryUntrusted, | ||||
|     DockerError, | ||||
|     HostAppArmorError, | ||||
|     SupervisorAppArmorError, | ||||
| @@ -35,7 +33,6 @@ from .exceptions import ( | ||||
| from .jobs.const import JobCondition, JobThrottle | ||||
| from .jobs.decorator import Job | ||||
| from .resolution.const import ContextType, IssueType, UnhealthyReason | ||||
| from .utils.codenotary import calc_checksum | ||||
| from .utils.sentry import async_capture_exception | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
| @@ -148,20 +145,6 @@ class Supervisor(CoreSysAttributes): | ||||
|                 _LOGGER.error, | ||||
|             ) from err | ||||
|  | ||||
|         # Validate | ||||
|         try: | ||||
|             await self.sys_security.verify_own_content(calc_checksum(data)) | ||||
|         except CodeNotaryUntrusted as err: | ||||
|             raise SupervisorAppArmorError( | ||||
|                 "Content-Trust is broken for the AppArmor profile fetch!", | ||||
|                 _LOGGER.critical, | ||||
|             ) from err | ||||
|         except CodeNotaryError as err: | ||||
|             raise SupervisorAppArmorError( | ||||
|                 f"CodeNotary error while processing AppArmor fetch: {err!s}", | ||||
|                 _LOGGER.error, | ||||
|             ) from err | ||||
|  | ||||
|         # Load | ||||
|         temp_dir: TemporaryDirectory | None = None | ||||
|  | ||||
| @@ -261,13 +244,6 @@ class Supervisor(CoreSysAttributes): | ||||
|         """ | ||||
|         return self.instance.logs() | ||||
|  | ||||
|     def check_trust(self) -> Awaitable[None]: | ||||
|         """Calculate Supervisor docker content trust. | ||||
|  | ||||
|         Return Coroutine. | ||||
|         """ | ||||
|         return self.instance.check_trust() | ||||
|  | ||||
|     async def stats(self) -> DockerStats: | ||||
|         """Return stats of Supervisor.""" | ||||
|         try: | ||||
|   | ||||
| @@ -31,14 +31,8 @@ from .const import ( | ||||
|     UpdateChannel, | ||||
| ) | ||||
| from .coresys import CoreSys, CoreSysAttributes | ||||
| from .exceptions import ( | ||||
|     CodeNotaryError, | ||||
|     CodeNotaryUntrusted, | ||||
|     UpdaterError, | ||||
|     UpdaterJobError, | ||||
| ) | ||||
| from .exceptions import UpdaterError, UpdaterJobError | ||||
| from .jobs.decorator import Job, JobCondition | ||||
| from .utils.codenotary import calc_checksum | ||||
| from .utils.common import FileConfiguration | ||||
| from .validate import SCHEMA_UPDATER_CONFIG | ||||
|  | ||||
| @@ -289,19 +283,6 @@ class Updater(FileConfiguration, CoreSysAttributes): | ||||
|             self.sys_bus.remove_listener(self._connectivity_listener) | ||||
|             self._connectivity_listener = None | ||||
|  | ||||
|         # Validate | ||||
|         try: | ||||
|             await self.sys_security.verify_own_content(calc_checksum(data)) | ||||
|         except CodeNotaryUntrusted as err: | ||||
|             raise UpdaterError( | ||||
|                 "Content-Trust is broken for the version file fetch!", _LOGGER.critical | ||||
|             ) from err | ||||
|         except CodeNotaryError as err: | ||||
|             raise UpdaterError( | ||||
|                 f"CodeNotary error while processing version fetch: {err!s}", | ||||
|                 _LOGGER.error, | ||||
|             ) from err | ||||
|  | ||||
|         # Parse data | ||||
|         try: | ||||
|             data = json.loads(data) | ||||
|   | ||||
| @@ -1,109 +0,0 @@ | ||||
| """Small wrapper for CodeNotary.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| import asyncio | ||||
| import hashlib | ||||
| import json | ||||
| import logging | ||||
| from pathlib import Path | ||||
| import shlex | ||||
| from typing import Final | ||||
|  | ||||
| from dirhash import dirhash | ||||
|  | ||||
| from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted | ||||
| from . import clean_env | ||||
|  | ||||
| _LOGGER: logging.Logger = logging.getLogger(__name__) | ||||
|  | ||||
| _CAS_CMD: str = ( | ||||
|     "cas authenticate --signerID {signer} --silent --output json --hash {sum}" | ||||
| ) | ||||
| _CACHE: set[tuple[str, str]] = set() | ||||
|  | ||||
|  | ||||
| _ATTR_ERROR: Final = "error" | ||||
| _ATTR_STATUS: Final = "status" | ||||
| _FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue" | ||||
|  | ||||
|  | ||||
| def calc_checksum(data: str | bytes) -> str: | ||||
|     """Generate checksum for CodeNotary.""" | ||||
|     if isinstance(data, str): | ||||
|         return hashlib.sha256(data.encode()).hexdigest() | ||||
|     return hashlib.sha256(data).hexdigest() | ||||
|  | ||||
|  | ||||
| def calc_checksum_path_sourcecode(folder: Path) -> str: | ||||
|     """Calculate checksum for a path source code. | ||||
|  | ||||
|     Need catch OSError. | ||||
|     """ | ||||
|     return dirhash(folder.as_posix(), "sha256", match=["*.py"]) | ||||
|  | ||||
|  | ||||
| # pylint: disable=unreachable | ||||
| async def cas_validate( | ||||
|     signer: str, | ||||
|     checksum: str, | ||||
| ) -> None: | ||||
|     """Validate data against CodeNotary.""" | ||||
|     return | ||||
|     if (checksum, signer) in _CACHE: | ||||
|         return | ||||
|  | ||||
|     # Generate command for request | ||||
|     command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum)) | ||||
|  | ||||
|     # Request notary authorization | ||||
|     _LOGGER.debug("Send cas command: %s", command) | ||||
|     try: | ||||
|         proc = await asyncio.create_subprocess_exec( | ||||
|             *command, | ||||
|             stdin=asyncio.subprocess.DEVNULL, | ||||
|             stdout=asyncio.subprocess.PIPE, | ||||
|             stderr=asyncio.subprocess.PIPE, | ||||
|             env=clean_env(), | ||||
|         ) | ||||
|  | ||||
|         async with asyncio.timeout(15): | ||||
|             data, error = await proc.communicate() | ||||
|     except TimeoutError: | ||||
|         raise CodeNotaryBackendError( | ||||
|             "Timeout while processing CodeNotary", _LOGGER.warning | ||||
|         ) from None | ||||
|     except OSError as err: | ||||
|         raise CodeNotaryError( | ||||
|             f"CodeNotary fatal error: {err!s}", _LOGGER.critical | ||||
|         ) from err | ||||
|  | ||||
|     # Check if Notarized | ||||
|     if proc.returncode != 0 and not data: | ||||
|         if error: | ||||
|             try: | ||||
|                 error = error.decode("utf-8") | ||||
|             except UnicodeDecodeError as err: | ||||
|                 raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err | ||||
|             if "not notarized" in error: | ||||
|                 raise CodeNotaryUntrusted() | ||||
|         else: | ||||
|             error = _FALLBACK_ERROR | ||||
|         raise CodeNotaryBackendError(error, _LOGGER.warning) | ||||
|  | ||||
|     # Parse data | ||||
|     try: | ||||
|         data_json = json.loads(data) | ||||
|         _LOGGER.debug("CodeNotary response with: %s", data_json) | ||||
|     except (json.JSONDecodeError, UnicodeDecodeError) as err: | ||||
|         raise CodeNotaryError( | ||||
|             f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error | ||||
|         ) from err | ||||
|  | ||||
|     if _ATTR_ERROR in data_json: | ||||
|         raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning) | ||||
|  | ||||
|     if data_json[_ATTR_STATUS] == 0: | ||||
|         _CACHE.add((checksum, signer)) | ||||
|     else: | ||||
|         raise CodeNotaryUntrusted() | ||||
| @@ -41,11 +41,9 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys): | ||||
| async def test_api_integrity_check( | ||||
|     api_client, coresys: CoreSys, supervisor_internet: AsyncMock | ||||
| ): | ||||
|     """Test security integrity check.""" | ||||
|     coresys.security.content_trust = False | ||||
|  | ||||
|     """Test security integrity check - now deprecated.""" | ||||
|     resp = await api_client.post("/security/integrity") | ||||
|     result = await resp.json() | ||||
|  | ||||
|     assert result["data"]["core"] == "untested" | ||||
|     assert result["data"]["supervisor"] == "untested" | ||||
|     # CodeNotary integrity check has been removed | ||||
|     assert "error" in result["data"] | ||||
|   | ||||
| @@ -32,15 +32,6 @@ from supervisor.jobs import JobSchedulerOptions, SupervisorJob | ||||
| from tests.common import load_json_fixture | ||||
|  | ||||
|  | ||||
| @pytest.fixture(autouse=True) | ||||
| def mock_verify_content(coresys: CoreSys): | ||||
|     """Mock verify_content utility during tests.""" | ||||
|     with patch.object( | ||||
|         coresys.security, "verify_content", return_value=None | ||||
|     ) as verify_content: | ||||
|         yield verify_content | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "cpu_arch, platform", | ||||
|     [ | ||||
|   | ||||
| @@ -17,7 +17,6 @@ from supervisor.exceptions import ( | ||||
|     AudioJobError, | ||||
|     CliError, | ||||
|     CliJobError, | ||||
|     CodeNotaryUntrusted, | ||||
|     CoreDNSError, | ||||
|     CoreDNSJobError, | ||||
|     DockerError, | ||||
| @@ -337,14 +336,12 @@ async def test_repair_failed( | ||||
|         patch.object( | ||||
|             DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64) | ||||
|         ), | ||||
|         patch( | ||||
|             "supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted | ||||
|         ), | ||||
|         patch.object(DockerInterface, "install", side_effect=DockerError), | ||||
|     ): | ||||
|         await plugin.repair() | ||||
|  | ||||
|     capture_exception.assert_called_once() | ||||
|     assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted) | ||||
|     assert check_exception_chain(capture_exception.call_args[0][0], DockerError) | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|   | ||||
| @@ -1,96 +0,0 @@ | ||||
| """Test Check Supervisor trust.""" | ||||
|  | ||||
| # pylint: disable=import-error,protected-access | ||||
| from unittest.mock import AsyncMock, patch | ||||
|  | ||||
| from supervisor.const import CoreState | ||||
| from supervisor.coresys import CoreSys | ||||
| from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted | ||||
| from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust | ||||
| from supervisor.resolution.const import IssueType, UnhealthyReason | ||||
|  | ||||
|  | ||||
| async def test_base(coresys: CoreSys): | ||||
|     """Test check basics.""" | ||||
|     supervisor_trust = CheckSupervisorTrust(coresys) | ||||
|     assert supervisor_trust.slug == "supervisor_trust" | ||||
|     assert supervisor_trust.enabled | ||||
|  | ||||
|  | ||||
| async def test_check(coresys: CoreSys): | ||||
|     """Test check.""" | ||||
|     supervisor_trust = CheckSupervisorTrust(coresys) | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|  | ||||
|     assert len(coresys.resolution.issues) == 0 | ||||
|  | ||||
|     coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) | ||||
|     await supervisor_trust.run_check() | ||||
|     assert coresys.supervisor.check_trust.called | ||||
|  | ||||
|     coresys.supervisor.check_trust = AsyncMock(return_value=None) | ||||
|     await supervisor_trust.run_check() | ||||
|     assert coresys.supervisor.check_trust.called | ||||
|  | ||||
|     assert len(coresys.resolution.issues) == 0 | ||||
|  | ||||
|     coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     await supervisor_trust.run_check() | ||||
|     assert coresys.supervisor.check_trust.called | ||||
|  | ||||
|     assert len(coresys.resolution.issues) == 1 | ||||
|     assert coresys.resolution.issues[-1].type == IssueType.TRUST | ||||
|  | ||||
|     assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy | ||||
|  | ||||
|  | ||||
| async def test_approve(coresys: CoreSys): | ||||
|     """Test check.""" | ||||
|     supervisor_trust = CheckSupervisorTrust(coresys) | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|  | ||||
|     coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     assert await supervisor_trust.approve_check() | ||||
|  | ||||
|     coresys.supervisor.check_trust = AsyncMock(return_value=None) | ||||
|     assert not await supervisor_trust.approve_check() | ||||
|  | ||||
|  | ||||
| async def test_with_global_disable(coresys: CoreSys, caplog): | ||||
|     """Test when pwned is globally disabled.""" | ||||
|     coresys.security.content_trust = False | ||||
|     supervisor_trust = CheckSupervisorTrust(coresys) | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|  | ||||
|     assert len(coresys.resolution.issues) == 0 | ||||
|     coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     await supervisor_trust.run_check() | ||||
|     assert not coresys.security.verify_own_content.called | ||||
|     assert ( | ||||
|         "Skipping supervisor_trust, content_trust is globally disabled" in caplog.text | ||||
|     ) | ||||
|  | ||||
|  | ||||
| async def test_did_run(coresys: CoreSys): | ||||
|     """Test that the check ran as expected.""" | ||||
|     supervisor_trust = CheckSupervisorTrust(coresys) | ||||
|     should_run = supervisor_trust.states | ||||
|     should_not_run = [state for state in CoreState if state not in should_run] | ||||
|     assert len(should_run) != 0 | ||||
|     assert len(should_not_run) != 0 | ||||
|  | ||||
|     with patch( | ||||
|         "supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check", | ||||
|         return_value=None, | ||||
|     ) as check: | ||||
|         for state in should_run: | ||||
|             await coresys.core.set_state(state) | ||||
|             await supervisor_trust() | ||||
|             check.assert_called_once() | ||||
|             check.reset_mock() | ||||
|  | ||||
|         for state in should_not_run: | ||||
|             await coresys.core.set_state(state) | ||||
|             await supervisor_trust() | ||||
|             check.assert_not_called() | ||||
|             check.reset_mock() | ||||
| @@ -1,38 +1,20 @@ | ||||
| """Test evaluation base.""" | ||||
|  | ||||
| # pylint: disable=import-error,protected-access | ||||
| import errno | ||||
| import os | ||||
| from pathlib import Path | ||||
| from unittest.mock import AsyncMock, patch | ||||
| from unittest.mock import patch | ||||
|  | ||||
| from supervisor.const import CoreState | ||||
| from supervisor.coresys import CoreSys | ||||
| from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted | ||||
| from supervisor.resolution.const import ContextType, IssueType | ||||
| from supervisor.resolution.data import Issue | ||||
| from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods | ||||
|  | ||||
|  | ||||
| async def test_evaluation(coresys: CoreSys): | ||||
|     """Test evaluation.""" | ||||
|     with patch( | ||||
|         "supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE", | ||||
|         Path(f"{os.getcwd()}/supervisor"), | ||||
|     ): | ||||
|     """Test evaluation - CodeNotary removed.""" | ||||
|     sourcemods = EvaluateSourceMods(coresys) | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|  | ||||
|     # CodeNotary checking removed, evaluation always returns False now | ||||
|     assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|         coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|         await sourcemods() | ||||
|         assert sourcemods.reason in coresys.resolution.unsupported | ||||
|  | ||||
|         coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError) | ||||
|         await sourcemods() | ||||
|         assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|  | ||||
|         coresys.security.verify_own_content = AsyncMock() | ||||
|     await sourcemods() | ||||
|     assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|  | ||||
| @@ -63,27 +45,11 @@ async def test_did_run(coresys: CoreSys): | ||||
|  | ||||
|  | ||||
| async def test_evaluation_error(coresys: CoreSys): | ||||
|     """Test error reading file during evaluation.""" | ||||
|     """Test error reading file during evaluation - CodeNotary removed.""" | ||||
|     sourcemods = EvaluateSourceMods(coresys) | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|     corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM) | ||||
|  | ||||
|     # CodeNotary checking removed, evaluation always returns False now | ||||
|     assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|     assert corrupt_fs not in coresys.resolution.issues | ||||
|  | ||||
|     with patch( | ||||
|         "supervisor.utils.codenotary.dirhash", | ||||
|         side_effect=(err := OSError()), | ||||
|     ): | ||||
|         err.errno = errno.EBUSY | ||||
|     await sourcemods() | ||||
|     assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|         assert corrupt_fs in coresys.resolution.issues | ||||
|         assert coresys.core.healthy is True | ||||
|  | ||||
|         coresys.resolution.dismiss_issue(corrupt_fs) | ||||
|         err.errno = errno.EBADMSG | ||||
|         await sourcemods() | ||||
|         assert sourcemods.reason not in coresys.resolution.unsupported | ||||
|         assert corrupt_fs in coresys.resolution.issues | ||||
|         assert coresys.core.healthy is False | ||||
|   | ||||
| @@ -1,69 +0,0 @@ | ||||
| """Test evaluation base.""" | ||||
|  | ||||
| # pylint: disable=import-error,protected-access | ||||
| from datetime import timedelta | ||||
| from unittest.mock import AsyncMock | ||||
|  | ||||
| import time_machine | ||||
|  | ||||
| from supervisor.coresys import CoreSys | ||||
| from supervisor.resolution.const import ContextType, IssueType, SuggestionType | ||||
| from supervisor.resolution.data import Issue, Suggestion | ||||
| from supervisor.resolution.fixups.system_execute_integrity import ( | ||||
|     FixupSystemExecuteIntegrity, | ||||
| ) | ||||
| from supervisor.security.const import ContentTrustResult, IntegrityResult | ||||
| from supervisor.utils.dt import utcnow | ||||
|  | ||||
|  | ||||
| async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock): | ||||
|     """Test fixup.""" | ||||
|     system_execute_integrity = FixupSystemExecuteIntegrity(coresys) | ||||
|  | ||||
|     assert system_execute_integrity.auto | ||||
|  | ||||
|     coresys.resolution.add_suggestion( | ||||
|         Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) | ||||
|     ) | ||||
|     coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) | ||||
|  | ||||
|     coresys.security.integrity_check = AsyncMock( | ||||
|         return_value=IntegrityResult( | ||||
|             ContentTrustResult.PASS, | ||||
|             ContentTrustResult.PASS, | ||||
|             {"audio": ContentTrustResult.PASS}, | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     await system_execute_integrity() | ||||
|  | ||||
|     assert coresys.security.integrity_check.called | ||||
|     assert len(coresys.resolution.suggestions) == 0 | ||||
|     assert len(coresys.resolution.issues) == 0 | ||||
|  | ||||
|  | ||||
| async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock): | ||||
|     """Test fixup.""" | ||||
|     system_execute_integrity = FixupSystemExecuteIntegrity(coresys) | ||||
|  | ||||
|     assert system_execute_integrity.auto | ||||
|  | ||||
|     coresys.resolution.add_suggestion( | ||||
|         Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM) | ||||
|     ) | ||||
|     coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM)) | ||||
|  | ||||
|     coresys.security.integrity_check = AsyncMock( | ||||
|         return_value=IntegrityResult( | ||||
|             ContentTrustResult.FAILED, | ||||
|             ContentTrustResult.PASS, | ||||
|             {"audio": ContentTrustResult.PASS}, | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|     with time_machine.travel(utcnow() + timedelta(hours=24)): | ||||
|         await system_execute_integrity() | ||||
|  | ||||
|     assert coresys.security.integrity_check.called | ||||
|     assert len(coresys.resolution.suggestions) == 1 | ||||
|     assert len(coresys.resolution.issues) == 1 | ||||
| @@ -1,21 +1,15 @@ | ||||
| """Test evaluations.""" | ||||
|  | ||||
| from unittest.mock import Mock, patch | ||||
| from unittest.mock import Mock | ||||
|  | ||||
| from supervisor.const import CoreState | ||||
| from supervisor.coresys import CoreSys | ||||
| from supervisor.utils import check_exception_chain | ||||
|  | ||||
|  | ||||
| async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock): | ||||
|     """Test error while evaluating system.""" | ||||
|     await coresys.core.set_state(CoreState.RUNNING) | ||||
|  | ||||
|     with patch( | ||||
|         "supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode", | ||||
|         side_effect=RuntimeError, | ||||
|     ): | ||||
|     await coresys.resolution.evaluate.evaluate_system() | ||||
|  | ||||
|     capture_exception.assert_called_once() | ||||
|     assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError) | ||||
|     capture_exception.assert_not_called() | ||||
|   | ||||
| @@ -1,127 +0,0 @@ | ||||
| """Testing handling with Security.""" | ||||
|  | ||||
| from unittest.mock import AsyncMock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from supervisor.coresys import CoreSys | ||||
| from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted | ||||
| from supervisor.security.const import ContentTrustResult | ||||
|  | ||||
|  | ||||
| async def test_content_trust(coresys: CoreSys): | ||||
|     """Test Content-Trust.""" | ||||
|  | ||||
|     with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: | ||||
|         await coresys.security.verify_content("test@mail.com", "ffffffffffffff") | ||||
|         assert cas_validate.called | ||||
|         cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") | ||||
|  | ||||
|         with patch( | ||||
|             "supervisor.security.module.cas_validate", AsyncMock() | ||||
|         ) as cas_validate: | ||||
|             await coresys.security.verify_own_content("ffffffffffffff") | ||||
|             assert cas_validate.called | ||||
|             cas_validate.assert_called_once_with( | ||||
|                 "notary@home-assistant.io", "ffffffffffffff" | ||||
|             ) | ||||
|  | ||||
|  | ||||
| async def test_disabled_content_trust(coresys: CoreSys): | ||||
|     """Test Content-Trust.""" | ||||
|     coresys.security.content_trust = False | ||||
|  | ||||
|     with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: | ||||
|         await coresys.security.verify_content("test@mail.com", "ffffffffffffff") | ||||
|         assert not cas_validate.called | ||||
|  | ||||
|     with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate: | ||||
|         await coresys.security.verify_own_content("ffffffffffffff") | ||||
|         assert not cas_validate.called | ||||
|  | ||||
|  | ||||
| async def test_force_content_trust(coresys: CoreSys): | ||||
|     """Force Content-Trust tests.""" | ||||
|  | ||||
|     with patch( | ||||
|         "supervisor.security.module.cas_validate", | ||||
|         AsyncMock(side_effect=CodeNotaryError), | ||||
|     ) as cas_validate: | ||||
|         await coresys.security.verify_content("test@mail.com", "ffffffffffffff") | ||||
|         assert cas_validate.called | ||||
|         cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff") | ||||
|  | ||||
|     coresys.security.force = True | ||||
|  | ||||
|     with ( | ||||
|         patch( | ||||
|             "supervisor.security.module.cas_validate", | ||||
|             AsyncMock(side_effect=CodeNotaryError), | ||||
|         ) as cas_validate, | ||||
|         pytest.raises(CodeNotaryError), | ||||
|     ): | ||||
|         await coresys.security.verify_content("test@mail.com", "ffffffffffffff") | ||||
|  | ||||
|  | ||||
| async def test_integrity_check_disabled(coresys: CoreSys): | ||||
|     """Test integrity check with disabled content trust.""" | ||||
|     coresys.security.content_trust = False | ||||
|  | ||||
|     result = await coresys.security.integrity_check.__wrapped__(coresys.security) | ||||
|  | ||||
|     assert result.core == ContentTrustResult.UNTESTED | ||||
|     assert result.supervisor == ContentTrustResult.UNTESTED | ||||
|  | ||||
|  | ||||
| async def test_integrity_check(coresys: CoreSys, install_addon_ssh): | ||||
|     """Test integrity check with content trust.""" | ||||
|     coresys.homeassistant.core.check_trust = AsyncMock() | ||||
|     coresys.supervisor.check_trust = AsyncMock() | ||||
|     install_addon_ssh.check_trust = AsyncMock() | ||||
|     install_addon_ssh.data["codenotary"] = "test@example.com" | ||||
|  | ||||
|     result = await coresys.security.integrity_check.__wrapped__(coresys.security) | ||||
|  | ||||
|     assert result.core == ContentTrustResult.PASS | ||||
|     assert result.supervisor == ContentTrustResult.PASS | ||||
|     assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS | ||||
|  | ||||
|  | ||||
| async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh): | ||||
|     """Test integrity check with content trust issues.""" | ||||
|     coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) | ||||
|     install_addon_ssh.data["codenotary"] = "test@example.com" | ||||
|  | ||||
|     result = await coresys.security.integrity_check.__wrapped__(coresys.security) | ||||
|  | ||||
|     assert result.core == ContentTrustResult.ERROR | ||||
|     assert result.supervisor == ContentTrustResult.ERROR | ||||
|     assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR | ||||
|  | ||||
|  | ||||
| async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh): | ||||
|     """Test integrity check with content trust failed.""" | ||||
|     coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) | ||||
|     coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) | ||||
|     install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError) | ||||
|     install_addon_ssh.data["codenotary"] = "test@example.com" | ||||
|  | ||||
|     result = await coresys.security.integrity_check.__wrapped__(coresys.security) | ||||
|  | ||||
|     assert result.core == ContentTrustResult.FAILED | ||||
|     assert result.supervisor == ContentTrustResult.FAILED | ||||
|     assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED | ||||
|  | ||||
|  | ||||
| async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh): | ||||
|     """Test integrity check with content trust but no signed add-ons.""" | ||||
|     coresys.homeassistant.core.check_trust = AsyncMock() | ||||
|     coresys.supervisor.check_trust = AsyncMock() | ||||
|  | ||||
|     result = await coresys.security.integrity_check.__wrapped__(coresys.security) | ||||
|  | ||||
|     assert result.core == ContentTrustResult.PASS | ||||
|     assert result.supervisor == ContentTrustResult.PASS | ||||
|     assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED | ||||
| @@ -86,7 +86,7 @@ async def test_os_update_path( | ||||
|     """Test OS upgrade path across major versions.""" | ||||
|     coresys.os._board = "rpi4"  # pylint: disable=protected-access | ||||
|     coresys.os._version = AwesomeVersion(version)  # pylint: disable=protected-access | ||||
|     with patch.object(type(coresys.security), "verify_own_content"): | ||||
|     # CodeNotary verification removed | ||||
|     await coresys.updater.fetch_data() | ||||
|  | ||||
|     assert coresys.updater.version_hassos == AwesomeVersion(expected) | ||||
| @@ -105,7 +105,6 @@ async def test_delayed_fetch_for_connectivity( | ||||
|         load_binary_fixture("version_stable.json") | ||||
|     ) | ||||
|     coresys.websession.head = AsyncMock() | ||||
|     coresys.security.verify_own_content = AsyncMock() | ||||
|  | ||||
|     # Network connectivity change causes a series of async tasks to eventually do a version fetch | ||||
|     # Rather then use some kind of sleep loop, set up listener for start of fetch data job | ||||
|   | ||||
| @@ -1,128 +0,0 @@ | ||||
| """Test CodeNotary.""" | ||||
|  | ||||
| from __future__ import annotations | ||||
|  | ||||
| from dataclasses import dataclass | ||||
| from unittest.mock import AsyncMock, Mock, patch | ||||
|  | ||||
| import pytest | ||||
|  | ||||
| from supervisor.exceptions import ( | ||||
|     CodeNotaryBackendError, | ||||
|     CodeNotaryError, | ||||
|     CodeNotaryUntrusted, | ||||
| ) | ||||
| from supervisor.utils.codenotary import calc_checksum, cas_validate | ||||
|  | ||||
| pytest.skip("code notary has been disabled due to issues", allow_module_level=True) | ||||
|  | ||||
|  | ||||
| @dataclass | ||||
| class SubprocessResponse: | ||||
|     """Class for specifying subprocess exec response.""" | ||||
|  | ||||
|     returncode: int = 0 | ||||
|     data: str = "" | ||||
|     error: str | None = None | ||||
|     exception: Exception | None = None | ||||
|  | ||||
|  | ||||
| @pytest.fixture(name="subprocess_exec") | ||||
| def fixture_subprocess_exec(request): | ||||
|     """Mock subprocess exec with specific return.""" | ||||
|     response = request.param | ||||
|     if response.exception: | ||||
|         communicate_return = AsyncMock(side_effect=response.exception) | ||||
|     else: | ||||
|         communicate_return = AsyncMock(return_value=(response.data, response.error)) | ||||
|  | ||||
|     exec_return = Mock(returncode=response.returncode, communicate=communicate_return) | ||||
|  | ||||
|     with patch( | ||||
|         "supervisor.utils.codenotary.asyncio.create_subprocess_exec", | ||||
|         return_value=exec_return, | ||||
|     ) as subprocess_exec: | ||||
|         yield subprocess_exec | ||||
|  | ||||
|  | ||||
| def test_checksum_calc(): | ||||
|     """Calc Checkusm as test.""" | ||||
|     assert calc_checksum("test") == calc_checksum(b"test") | ||||
|     assert ( | ||||
|         calc_checksum("test") | ||||
|         == "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" | ||||
|     ) | ||||
|  | ||||
|  | ||||
| async def test_valid_checksum(): | ||||
|     """Test a valid autorization.""" | ||||
|     await cas_validate( | ||||
|         "notary@home-assistant.io", | ||||
|         "4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0", | ||||
|     ) | ||||
|  | ||||
|  | ||||
| async def test_invalid_checksum(): | ||||
|     """Test a invalid autorization.""" | ||||
|     with pytest.raises(CodeNotaryUntrusted): | ||||
|         await cas_validate( | ||||
|             "notary@home-assistant.io", | ||||
|             "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "subprocess_exec", | ||||
|     [SubprocessResponse(returncode=1, error=b"x is not notarized")], | ||||
| ) | ||||
| async def test_not_notarized_error(subprocess_exec): | ||||
|     """Test received a not notarized error response from command.""" | ||||
|     with pytest.raises(CodeNotaryUntrusted): | ||||
|         await cas_validate( | ||||
|             "notary@home-assistant.io", | ||||
|             "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "subprocess_exec", | ||||
|     [ | ||||
|         SubprocessResponse(returncode=1, error=b"test"), | ||||
|         SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'), | ||||
|         SubprocessResponse(returncode=1, error="test".encode("utf-16")), | ||||
|     ], | ||||
|     indirect=True, | ||||
| ) | ||||
| async def test_cas_backend_error(subprocess_exec): | ||||
|     """Test backend error executing cas command.""" | ||||
|     with pytest.raises(CodeNotaryBackendError): | ||||
|         await cas_validate( | ||||
|             "notary@home-assistant.io", | ||||
|             "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "subprocess_exec", | ||||
|     [SubprocessResponse(returncode=0, data='{"status":1}')], | ||||
|     indirect=True, | ||||
| ) | ||||
| async def test_cas_notarized_untrusted(subprocess_exec): | ||||
|     """Test cas found notarized but untrusted content.""" | ||||
|     with pytest.raises(CodeNotaryUntrusted): | ||||
|         await cas_validate( | ||||
|             "notary@home-assistant.io", | ||||
|             "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", | ||||
|         ) | ||||
|  | ||||
|  | ||||
| @pytest.mark.parametrize( | ||||
|     "subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True | ||||
| ) | ||||
| async def test_cas_exec_os_error(subprocess_exec): | ||||
|     """Test os error attempting to execute cas command.""" | ||||
|     with pytest.raises(CodeNotaryError): | ||||
|         await cas_validate( | ||||
|             "notary@home-assistant.io", | ||||
|             "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", | ||||
|         ) | ||||
		Reference in New Issue
	
	Block a user