diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 4ad374b5d..43cea0e29 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,6 +1,8 @@ FROM mcr.microsoft.com/vscode/devcontainers/python:0-3.9 -ENV DEBIAN_FRONTEND=noninteractive +ENV \ + DEBIAN_FRONTEND=noninteractive \ + VCN_VERSION=0.9.8 SHELL ["/bin/bash", "-c"] @@ -50,7 +52,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ network-manager \ apparmor-utils \ libpulse0 \ - && bash <(curl https://getvcn.codenotary.com -L) \ + && curl -Lo /bin/vcn https://github.com/codenotary/vcn/releases/download/${VCN_VERSION}/vcn-${VCN_VERSION}-linux-amd64-static \ + && chmod a+x /bin/vcn \ && rm -rf /var/lib/apt/lists/* # Install Python dependencies from requirements.txt if it exists diff --git a/Dockerfile b/Dockerfile index fa9899b51..172a9ce18 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,9 +29,6 @@ RUN \ https://github.com/codenotary/vcn \ && cd vcn \ \ - # Fix: https://github.com/codenotary/vcn/issues/131 - && go get github.com/codenotary/immudb@4cf9e2ae06ac2e6ec98a60364c3de3eab5524757 \ - \ && if [ "${BUILD_ARCH}" = "armhf" ]; then \ GOARM=6 GOARCH=arm go build -o vcn -ldflags="-s -w" ./cmd/vcn; \ elif [ "${BUILD_ARCH}" = "armv7" ]; then \ diff --git a/build.json b/build.json index 14d2b0a30..a4c9ef943 100644 --- a/build.json +++ b/build.json @@ -9,7 +9,7 @@ "i386": "ghcr.io/home-assistant/i386-base-python:3.9-alpine3.14" }, "args": { - "VCN_VERSION": "0.9.4" + "VCN_VERSION": "0.9.8" }, "labels": { "io.hass.type": "supervisor", diff --git a/supervisor/docker/interface.py b/supervisor/docker/interface.py index c216dd6ec..00c0f2afd 100644 --- a/supervisor/docker/interface.py +++ b/supervisor/docker/interface.py @@ -624,3 +624,17 @@ class DockerInterface(CoreSysAttributes): self.sys_security.verify_own_content(checksum=checksum), self.sys_loop ) job.result(timeout=20) + + @process_lock + def check_trust(self) -> Awaitable[None]: + """Check trust of exists Docker image.""" + return self.sys_run_in_executor(self._check_trust) + + def _check_trust(self) -> None: + """Check trust of current image.""" + try: + image = self.sys_docker.images.get(f"{self.image}:{self.version!s}") + except (docker.errors.DockerException, requests.RequestException): + return + + self._validate_trust(image.id, self.image, self.version) diff --git a/supervisor/homeassistant/core.py b/supervisor/homeassistant/core.py index e80645e95..8ea5c17cc 100644 --- a/supervisor/homeassistant/core.py +++ b/supervisor/homeassistant/core.py @@ -309,6 +309,13 @@ class HomeAssistantCore(CoreSysAttributes): """ return self.instance.logs() + def check_trust(self) -> Awaitable[None]: + """Calculate HomeAssistant docker content trust. + + Return Coroutine. + """ + return self.instance.check_trust() + async def stats(self) -> DockerStats: """Return stats of Home Assistant. diff --git a/supervisor/resolution/checks/core_security.py b/supervisor/resolution/checks/core_security.py index 86ea61daf..b488c1c38 100644 --- a/supervisor/resolution/checks/core_security.py +++ b/supervisor/resolution/checks/core_security.py @@ -27,6 +27,7 @@ class CheckCoreSecurity(CheckBase): async def run_check(self) -> None: """Run check if not affected by issue.""" + # Security issue < 2021.1.5 & Custom components try: if self.sys_homeassistant.version < AwesomeVersion("2021.1.5"): if Path( diff --git a/supervisor/resolution/checks/core_trust.py b/supervisor/resolution/checks/core_trust.py new file mode 100644 index 000000000..075187526 --- /dev/null +++ b/supervisor/resolution/checks/core_trust.py @@ -0,0 +1,59 @@ +"""Helpers to check core trust.""" +import logging +from typing import List, Optional + +from ...const import CoreState +from ...coresys import CoreSys +from ...exceptions import CodeNotaryError, CodeNotaryUntrusted +from ..const import ContextType, IssueType, UnhealthyReason +from .base import CheckBase + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +def setup(coresys: CoreSys) -> CheckBase: + """Check setup function.""" + return CheckCoreTrust(coresys) + + +class CheckCoreTrust(CheckBase): + """CheckCoreTrust class for check.""" + + async def run_check(self) -> None: + """Run check if not affected by issue.""" + if not self.sys_security.content_trust: + _LOGGER.warning( + "Skipping %s, content_trust is globally disabled", self.slug + ) + return + + try: + await self.sys_homeassistant.core.check_trust() + except CodeNotaryUntrusted: + self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED + self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE) + except CodeNotaryError: + pass + + async def approve_check(self, reference: Optional[str] = None) -> bool: + """Approve check if it is affected by issue.""" + try: + await self.sys_homeassistant.core.check_trust() + except CodeNotaryError: + return True + return False + + @property + def issue(self) -> IssueType: + """Return a IssueType enum.""" + return IssueType.TRUST + + @property + def context(self) -> ContextType: + """Return a ContextType enum.""" + return ContextType.CORE + + @property + def states(self) -> List[CoreState]: + """Return a list of valid states when this check can run.""" + return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/resolution/checks/supervisor_trust.py b/supervisor/resolution/checks/supervisor_trust.py new file mode 100644 index 000000000..eff2f6188 --- /dev/null +++ b/supervisor/resolution/checks/supervisor_trust.py @@ -0,0 +1,59 @@ +"""Helpers to check supervisor trust.""" +import logging +from typing import List, Optional + +from ...const import CoreState +from ...coresys import CoreSys +from ...exceptions import CodeNotaryError, CodeNotaryUntrusted +from ..const import ContextType, IssueType, UnhealthyReason +from .base import CheckBase + +_LOGGER: logging.Logger = logging.getLogger(__name__) + + +def setup(coresys: CoreSys) -> CheckBase: + """Check setup function.""" + return CheckSupervisorTrust(coresys) + + +class CheckSupervisorTrust(CheckBase): + """CheckSystemTrust class for check.""" + + async def run_check(self) -> None: + """Run check if not affected by issue.""" + if not self.sys_security.content_trust: + _LOGGER.warning( + "Skipping %s, content_trust is globally disabled", self.slug + ) + return + + try: + await self.sys_supervisor.check_trust() + except CodeNotaryUntrusted: + self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED + self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR) + except CodeNotaryError: + pass + + async def approve_check(self, reference: Optional[str] = None) -> bool: + """Approve check if it is affected by issue.""" + try: + await self.sys_supervisor.check_trust() + except CodeNotaryError: + return True + return False + + @property + def issue(self) -> IssueType: + """Return a IssueType enum.""" + return IssueType.TRUST + + @property + def context(self) -> ContextType: + """Return a ContextType enum.""" + return ContextType.SUPERVISOR + + @property + def states(self) -> List[CoreState]: + """Return a list of valid states when this check can run.""" + return [CoreState.RUNNING, CoreState.STARTUP] diff --git a/supervisor/resolution/const.py b/supervisor/resolution/const.py index bd4e7dc8c..8dd026ecd 100644 --- a/supervisor/resolution/const.py +++ b/supervisor/resolution/const.py @@ -66,6 +66,7 @@ class IssueType(str, Enum): FATAL_ERROR = "fatal_error" DNS_LOOP = "dns_loop" PWNED = "pwned" + TRUST = "trust" class SuggestionType(str, Enum): diff --git a/supervisor/security.py b/supervisor/security.py index 03a61aac1..58add0330 100644 --- a/supervisor/security.py +++ b/supervisor/security.py @@ -11,7 +11,6 @@ from .const import ( ) from .coresys import CoreSys, CoreSysAttributes from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError -from .resolution.const import UnhealthyReason from .utils.codenotary import vcn_validate from .utils.common import FileConfiguration from .utils.pwned import check_pwned_password @@ -69,7 +68,6 @@ class Security(FileConfiguration, CoreSysAttributes): try: await vcn_validate(checksum, path, org="home-assistant.io") except CodeNotaryUntrusted: - self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED raise except CodeNotaryError: if self.force: diff --git a/supervisor/supervisor.py b/supervisor/supervisor.py index fa93dd568..7944e1ac9 100644 --- a/supervisor/supervisor.py +++ b/supervisor/supervisor.py @@ -218,6 +218,13 @@ class Supervisor(CoreSysAttributes): """ return self.instance.logs() + def check_trust(self) -> Awaitable[None]: + """Calculate Supervisor docker content trust. + + Return Coroutine. + """ + return self.instance.check_trust() + async def stats(self) -> DockerStats: """Return stats of Supervisor.""" try: diff --git a/tests/resolution/check/test_check.py b/tests/resolution/check/test_check.py index 217243bbc..65715fd95 100644 --- a/tests/resolution/check/test_check.py +++ b/tests/resolution/check/test_check.py @@ -36,6 +36,7 @@ async def test_check_running(coresys: CoreSys): async def test_if_check_make_issue(coresys: CoreSys): """Test check for setup.""" coresys.core.state = CoreState.RUNNING + coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() @@ -46,6 +47,7 @@ async def test_if_check_make_issue(coresys: CoreSys): async def test_if_check_cleanup_issue(coresys: CoreSys): """Test check for setup.""" coresys.core.state = CoreState.RUNNING + coresys.security.content_trust = False with patch("shutil.disk_usage", return_value=(1, 1, 1)): await coresys.resolution.check.check_system() diff --git a/tests/resolution/check/test_check_core_trust.py b/tests/resolution/check/test_check_core_trust.py new file mode 100644 index 000000000..7fc994106 --- /dev/null +++ b/tests/resolution/check/test_check_core_trust.py @@ -0,0 +1,93 @@ +"""Test Check Addon Pwned.""" +# pylint: disable=import-error,protected-access +from unittest.mock import AsyncMock, patch + +from supervisor.const import CoreState +from supervisor.coresys import CoreSys +from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted +from supervisor.resolution.checks.core_trust import CheckCoreTrust +from supervisor.resolution.const import IssueType, UnhealthyReason + + +async def test_base(coresys: CoreSys): + """Test check basics.""" + core_trust = CheckCoreTrust(coresys) + assert core_trust.slug == "core_trust" + assert core_trust.enabled + + +async def test_check(coresys: CoreSys): + """Test check.""" + core_trust = CheckCoreTrust(coresys) + coresys.core.state = CoreState.RUNNING + + assert len(coresys.resolution.issues) == 0 + + coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError) + await core_trust.run_check() + assert coresys.homeassistant.core.check_trust.called + + coresys.homeassistant.core.check_trust = AsyncMock(return_value=None) + await core_trust.run_check() + assert coresys.homeassistant.core.check_trust.called + + assert len(coresys.resolution.issues) == 0 + + coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + await core_trust.run_check() + assert coresys.homeassistant.core.check_trust.called + + assert len(coresys.resolution.issues) == 1 + assert coresys.resolution.issues[-1].type == IssueType.TRUST + + assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy + + +async def test_approve(coresys: CoreSys): + """Test check.""" + core_trust = CheckCoreTrust(coresys) + coresys.core.state = CoreState.RUNNING + + coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + assert await core_trust.approve_check() + + coresys.homeassistant.core.check_trust = AsyncMock(return_value=None) + assert not await core_trust.approve_check() + + +async def test_with_global_disable(coresys: CoreSys, caplog): + """Test when pwned is globally disabled.""" + coresys.security.content_trust = False + core_trust = CheckCoreTrust(coresys) + coresys.core.state = CoreState.RUNNING + + assert len(coresys.resolution.issues) == 0 + coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) + await core_trust.run_check() + assert not coresys.security.verify_own_content.called + assert "Skipping core_trust, content_trust is globally disabled" in caplog.text + + +async def test_did_run(coresys: CoreSys): + """Test that the check ran as expected.""" + core_trust = CheckCoreTrust(coresys) + should_run = core_trust.states + should_not_run = [state for state in CoreState if state not in should_run] + assert len(should_run) != 0 + assert len(should_not_run) != 0 + + with patch( + "supervisor.resolution.checks.core_trust.CheckCoreTrust.run_check", + return_value=None, + ) as check: + for state in should_run: + coresys.core.state = state + await core_trust() + check.assert_called_once() + check.reset_mock() + + for state in should_not_run: + coresys.core.state = state + await core_trust() + check.assert_not_called() + check.reset_mock() diff --git a/tests/resolution/check/test_check_system_trust.py b/tests/resolution/check/test_check_system_trust.py new file mode 100644 index 000000000..3471dc6d9 --- /dev/null +++ b/tests/resolution/check/test_check_system_trust.py @@ -0,0 +1,95 @@ +"""Test Check Addon Pwned.""" +# pylint: disable=import-error,protected-access +from unittest.mock import AsyncMock, patch + +from supervisor.const import CoreState +from supervisor.coresys import CoreSys +from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted +from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust +from supervisor.resolution.const import IssueType, UnhealthyReason + + +async def test_base(coresys: CoreSys): + """Test check basics.""" + supervisor_trust = CheckSupervisorTrust(coresys) + assert supervisor_trust.slug == "supervisor_trust" + assert supervisor_trust.enabled + + +async def test_check(coresys: CoreSys): + """Test check.""" + supervisor_trust = CheckSupervisorTrust(coresys) + coresys.core.state = CoreState.RUNNING + + assert len(coresys.resolution.issues) == 0 + + coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError) + await supervisor_trust.run_check() + assert coresys.supervisor.check_trust.called + + coresys.supervisor.check_trust = AsyncMock(return_value=None) + await supervisor_trust.run_check() + assert coresys.supervisor.check_trust.called + + assert len(coresys.resolution.issues) == 0 + + coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + await supervisor_trust.run_check() + assert coresys.supervisor.check_trust.called + + assert len(coresys.resolution.issues) == 1 + assert coresys.resolution.issues[-1].type == IssueType.TRUST + + assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy + + +async def test_approve(coresys: CoreSys): + """Test check.""" + supervisor_trust = CheckSupervisorTrust(coresys) + coresys.core.state = CoreState.RUNNING + + coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted) + assert await supervisor_trust.approve_check() + + coresys.supervisor.check_trust = AsyncMock(return_value=None) + assert not await supervisor_trust.approve_check() + + +async def test_with_global_disable(coresys: CoreSys, caplog): + """Test when pwned is globally disabled.""" + coresys.security.content_trust = False + supervisor_trust = CheckSupervisorTrust(coresys) + coresys.core.state = CoreState.RUNNING + + assert len(coresys.resolution.issues) == 0 + coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted) + await supervisor_trust.run_check() + assert not coresys.security.verify_own_content.called + assert ( + "Skipping supervisor_trust, content_trust is globally disabled" in caplog.text + ) + + +async def test_did_run(coresys: CoreSys): + """Test that the check ran as expected.""" + supervisor_trust = CheckSupervisorTrust(coresys) + should_run = supervisor_trust.states + should_not_run = [state for state in CoreState if state not in should_run] + assert len(should_run) != 0 + assert len(should_not_run) != 0 + + with patch( + "supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check", + return_value=None, + ) as check: + for state in should_run: + coresys.core.state = state + await supervisor_trust() + check.assert_called_once() + check.reset_mock() + + for state in should_not_run: + coresys.core.state = state + await supervisor_trust() + check.assert_not_called() + check.reset_mock()