Codenotary vcn 0.9.8 - System trust check (#3070)

* Update codenotary 0.9.8

* Fix lint

* ditch wrong code

* Fix run health check

* remove old code

* better structured

* more cleaner

* Fix core

* Add tests

* addjust test

* Split checks

* Update supervisor/resolution/checks/core_trust.py

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>

* Fix line end

Co-authored-by: Joakim Sørensen <joasoe@gmail.com>
This commit is contained in:
Pascal Vizeli 2021-08-24 12:19:08 +02:00 committed by GitHub
parent 62b364ea29
commit d54c23952f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 344 additions and 8 deletions

View File

@ -1,6 +1,8 @@
FROM mcr.microsoft.com/vscode/devcontainers/python:0-3.9 FROM mcr.microsoft.com/vscode/devcontainers/python:0-3.9
ENV DEBIAN_FRONTEND=noninteractive ENV \
DEBIAN_FRONTEND=noninteractive \
VCN_VERSION=0.9.8
SHELL ["/bin/bash", "-c"] SHELL ["/bin/bash", "-c"]
@ -50,7 +52,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
network-manager \ network-manager \
apparmor-utils \ apparmor-utils \
libpulse0 \ libpulse0 \
&& bash <(curl https://getvcn.codenotary.com -L) \ && curl -Lo /bin/vcn https://github.com/codenotary/vcn/releases/download/${VCN_VERSION}/vcn-${VCN_VERSION}-linux-amd64-static \
&& chmod a+x /bin/vcn \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Install Python dependencies from requirements.txt if it exists # Install Python dependencies from requirements.txt if it exists

View File

@ -29,9 +29,6 @@ RUN \
https://github.com/codenotary/vcn \ https://github.com/codenotary/vcn \
&& cd vcn \ && cd vcn \
\ \
# Fix: https://github.com/codenotary/vcn/issues/131
&& go get github.com/codenotary/immudb@4cf9e2ae06ac2e6ec98a60364c3de3eab5524757 \
\
&& if [ "${BUILD_ARCH}" = "armhf" ]; then \ && if [ "${BUILD_ARCH}" = "armhf" ]; then \
GOARM=6 GOARCH=arm go build -o vcn -ldflags="-s -w" ./cmd/vcn; \ GOARM=6 GOARCH=arm go build -o vcn -ldflags="-s -w" ./cmd/vcn; \
elif [ "${BUILD_ARCH}" = "armv7" ]; then \ elif [ "${BUILD_ARCH}" = "armv7" ]; then \

View File

@ -9,7 +9,7 @@
"i386": "ghcr.io/home-assistant/i386-base-python:3.9-alpine3.14" "i386": "ghcr.io/home-assistant/i386-base-python:3.9-alpine3.14"
}, },
"args": { "args": {
"VCN_VERSION": "0.9.4" "VCN_VERSION": "0.9.8"
}, },
"labels": { "labels": {
"io.hass.type": "supervisor", "io.hass.type": "supervisor",

View File

@ -624,3 +624,17 @@ class DockerInterface(CoreSysAttributes):
self.sys_security.verify_own_content(checksum=checksum), self.sys_loop self.sys_security.verify_own_content(checksum=checksum), self.sys_loop
) )
job.result(timeout=20) job.result(timeout=20)
@process_lock
def check_trust(self) -> Awaitable[None]:
"""Check trust of exists Docker image."""
return self.sys_run_in_executor(self._check_trust)
def _check_trust(self) -> None:
"""Check trust of current image."""
try:
image = self.sys_docker.images.get(f"{self.image}:{self.version!s}")
except (docker.errors.DockerException, requests.RequestException):
return
self._validate_trust(image.id, self.image, self.version)

View File

@ -309,6 +309,13 @@ class HomeAssistantCore(CoreSysAttributes):
""" """
return self.instance.logs() return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate HomeAssistant docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of Home Assistant. """Return stats of Home Assistant.

View File

@ -27,6 +27,7 @@ class CheckCoreSecurity(CheckBase):
async def run_check(self) -> None: async def run_check(self) -> None:
"""Run check if not affected by issue.""" """Run check if not affected by issue."""
# Security issue < 2021.1.5 & Custom components
try: try:
if self.sys_homeassistant.version < AwesomeVersion("2021.1.5"): if self.sys_homeassistant.version < AwesomeVersion("2021.1.5"):
if Path( if Path(

View File

@ -0,0 +1,59 @@
"""Helpers to check core trust."""
import logging
from typing import List, Optional
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckCoreTrust(coresys)
class CheckCoreTrust(CheckBase):
"""CheckCoreTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_homeassistant.core.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
except CodeNotaryError:
pass
async def approve_check(self, reference: Optional[str] = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_homeassistant.core.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.CORE
@property
def states(self) -> List[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -0,0 +1,59 @@
"""Helpers to check supervisor trust."""
import logging
from typing import List, Optional
from ...const import CoreState
from ...coresys import CoreSys
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
from ..const import ContextType, IssueType, UnhealthyReason
from .base import CheckBase
_LOGGER: logging.Logger = logging.getLogger(__name__)
def setup(coresys: CoreSys) -> CheckBase:
"""Check setup function."""
return CheckSupervisorTrust(coresys)
class CheckSupervisorTrust(CheckBase):
"""CheckSystemTrust class for check."""
async def run_check(self) -> None:
"""Run check if not affected by issue."""
if not self.sys_security.content_trust:
_LOGGER.warning(
"Skipping %s, content_trust is globally disabled", self.slug
)
return
try:
await self.sys_supervisor.check_trust()
except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
except CodeNotaryError:
pass
async def approve_check(self, reference: Optional[str] = None) -> bool:
"""Approve check if it is affected by issue."""
try:
await self.sys_supervisor.check_trust()
except CodeNotaryError:
return True
return False
@property
def issue(self) -> IssueType:
"""Return a IssueType enum."""
return IssueType.TRUST
@property
def context(self) -> ContextType:
"""Return a ContextType enum."""
return ContextType.SUPERVISOR
@property
def states(self) -> List[CoreState]:
"""Return a list of valid states when this check can run."""
return [CoreState.RUNNING, CoreState.STARTUP]

View File

@ -66,6 +66,7 @@ class IssueType(str, Enum):
FATAL_ERROR = "fatal_error" FATAL_ERROR = "fatal_error"
DNS_LOOP = "dns_loop" DNS_LOOP = "dns_loop"
PWNED = "pwned" PWNED = "pwned"
TRUST = "trust"
class SuggestionType(str, Enum): class SuggestionType(str, Enum):

View File

@ -11,7 +11,6 @@ from .const import (
) )
from .coresys import CoreSys, CoreSysAttributes from .coresys import CoreSys, CoreSysAttributes
from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError from .exceptions import CodeNotaryError, CodeNotaryUntrusted, PwnedError
from .resolution.const import UnhealthyReason
from .utils.codenotary import vcn_validate from .utils.codenotary import vcn_validate
from .utils.common import FileConfiguration from .utils.common import FileConfiguration
from .utils.pwned import check_pwned_password from .utils.pwned import check_pwned_password
@ -69,7 +68,6 @@ class Security(FileConfiguration, CoreSysAttributes):
try: try:
await vcn_validate(checksum, path, org="home-assistant.io") await vcn_validate(checksum, path, org="home-assistant.io")
except CodeNotaryUntrusted: except CodeNotaryUntrusted:
self.sys_resolution.unhealthy = UnhealthyReason.UNTRUSTED
raise raise
except CodeNotaryError: except CodeNotaryError:
if self.force: if self.force:

View File

@ -218,6 +218,13 @@ class Supervisor(CoreSysAttributes):
""" """
return self.instance.logs() return self.instance.logs()
def check_trust(self) -> Awaitable[None]:
"""Calculate Supervisor docker content trust.
Return Coroutine.
"""
return self.instance.check_trust()
async def stats(self) -> DockerStats: async def stats(self) -> DockerStats:
"""Return stats of Supervisor.""" """Return stats of Supervisor."""
try: try:

View File

@ -36,6 +36,7 @@ async def test_check_running(coresys: CoreSys):
async def test_if_check_make_issue(coresys: CoreSys): async def test_if_check_make_issue(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
coresys.core.state = CoreState.RUNNING coresys.core.state = CoreState.RUNNING
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)): with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system() await coresys.resolution.check.check_system()
@ -46,6 +47,7 @@ async def test_if_check_make_issue(coresys: CoreSys):
async def test_if_check_cleanup_issue(coresys: CoreSys): async def test_if_check_cleanup_issue(coresys: CoreSys):
"""Test check for setup.""" """Test check for setup."""
coresys.core.state = CoreState.RUNNING coresys.core.state = CoreState.RUNNING
coresys.security.content_trust = False
with patch("shutil.disk_usage", return_value=(1, 1, 1)): with patch("shutil.disk_usage", return_value=(1, 1, 1)):
await coresys.resolution.check.check_system() await coresys.resolution.check.check_system()

View File

@ -0,0 +1,93 @@
"""Test Check Addon Pwned."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.core_trust import CheckCoreTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
core_trust = CheckCoreTrust(coresys)
assert core_trust.slug == "core_trust"
assert core_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
coresys.homeassistant.core.check_trust = AsyncMock(return_value=None)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await core_trust.run_check()
assert coresys.homeassistant.core.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await core_trust.approve_check()
coresys.homeassistant.core.check_trust = AsyncMock(return_value=None)
assert not await core_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
core_trust = CheckCoreTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await core_trust.run_check()
assert not coresys.security.verify_own_content.called
assert "Skipping core_trust, content_trust is globally disabled" in caplog.text
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
core_trust = CheckCoreTrust(coresys)
should_run = core_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.core_trust.CheckCoreTrust.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await core_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await core_trust()
check.assert_not_called()
check.reset_mock()

View File

@ -0,0 +1,95 @@
"""Test Check Addon Pwned."""
# pylint: disable=import-error,protected-access
from unittest.mock import AsyncMock, patch
from supervisor.const import CoreState
from supervisor.coresys import CoreSys
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust
from supervisor.resolution.const import IssueType, UnhealthyReason
async def test_base(coresys: CoreSys):
"""Test check basics."""
supervisor_trust = CheckSupervisorTrust(coresys)
assert supervisor_trust.slug == "supervisor_trust"
assert supervisor_trust.enabled
async def test_check(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
coresys.supervisor.check_trust = AsyncMock(return_value=None)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 0
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert coresys.supervisor.check_trust.called
assert len(coresys.resolution.issues) == 1
assert coresys.resolution.issues[-1].type == IssueType.TRUST
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
async def test_approve(coresys: CoreSys):
"""Test check."""
supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
assert await supervisor_trust.approve_check()
coresys.supervisor.check_trust = AsyncMock(return_value=None)
assert not await supervisor_trust.approve_check()
async def test_with_global_disable(coresys: CoreSys, caplog):
"""Test when pwned is globally disabled."""
coresys.security.content_trust = False
supervisor_trust = CheckSupervisorTrust(coresys)
coresys.core.state = CoreState.RUNNING
assert len(coresys.resolution.issues) == 0
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
await supervisor_trust.run_check()
assert not coresys.security.verify_own_content.called
assert (
"Skipping supervisor_trust, content_trust is globally disabled" in caplog.text
)
async def test_did_run(coresys: CoreSys):
"""Test that the check ran as expected."""
supervisor_trust = CheckSupervisorTrust(coresys)
should_run = supervisor_trust.states
should_not_run = [state for state in CoreState if state not in should_run]
assert len(should_run) != 0
assert len(should_not_run) != 0
with patch(
"supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check",
return_value=None,
) as check:
for state in should_run:
coresys.core.state = state
await supervisor_trust()
check.assert_called_once()
check.reset_mock()
for state in should_not_run:
coresys.core.state = state
await supervisor_trust()
check.assert_not_called()
check.reset_mock()