mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-11-09 02:49:43 +00:00
Remove Codenotary integrity check (#6236)
* Formally deprecate CodeNotary build config * Remove CodeNotary specific integrity checking The current code is specific to how CodeNotary was doing integrity checking. A future integrity checking mechanism likely will work differently (e.g. through EROFS based containers). Remove the current code to make way for a future implementation. * Drop CodeNotary integrity fixups * Drop unused tests * Fix pytest * Fix pytest * Remove CodeNotary related exceptions and handling Remove CodeNotary related exceptions and handling from the Docker interface. * Drop unnecessary comment * Remove Codenotary specific IssueType/SuggestionType * Drop Codenotary specific environment and secret reference * Remove unused constants * Introduce APIGone exception for removed APIs Introduce a new exception class APIGone to indicate that certain API features have been removed and are no longer available. Update the security integrity check endpoint to raise this new exception instead of a generic APIError, providing clearer communication to clients that the feature has been intentionally removed. * Drop content trust A cosign based signature verification will likely be named differently to avoid confusion with existing implementations. For now, remove the content trust option entirely. * Drop code sign test * Remove source_mods/content_trust evaluations * Remove content_trust reference in bootstrap.py * Fix security tests * Drop unused tests * Drop codenotary from schema Since we have "remove extra" in voluptuous, we can remove the codenotary field from the addon schema. * Remove content_trust from tests * Remove content_trust unsupported reason * Remove unnecessary comment * Remove unrelated pytest * Remove unrelated fixtures
This commit is contained in:
29
.github/workflows/builder.yml
vendored
29
.github/workflows/builder.yml
vendored
@@ -170,8 +170,6 @@ jobs:
|
|||||||
--target /data \
|
--target /data \
|
||||||
--cosign \
|
--cosign \
|
||||||
--generic ${{ needs.init.outputs.version }}
|
--generic ${{ needs.init.outputs.version }}
|
||||||
env:
|
|
||||||
CAS_API_KEY: ${{ secrets.CAS_TOKEN }}
|
|
||||||
|
|
||||||
version:
|
version:
|
||||||
name: Update version
|
name: Update version
|
||||||
@@ -293,33 +291,6 @@ jobs:
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
- name: Check the Supervisor code sign
|
|
||||||
if: needs.init.outputs.publish == 'true'
|
|
||||||
run: |
|
|
||||||
echo "Enable Content-Trust"
|
|
||||||
test=$(docker exec hassio_cli ha security options --content-trust=true --no-progress --raw-json | jq -r '.result')
|
|
||||||
if [ "$test" != "ok" ]; then
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Run supervisor health check"
|
|
||||||
test=$(docker exec hassio_cli ha resolution healthcheck --no-progress --raw-json | jq -r '.result')
|
|
||||||
if [ "$test" != "ok" ]; then
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Check supervisor unhealthy"
|
|
||||||
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unhealthy[]')
|
|
||||||
if [ "$test" != "" ]; then
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Check supervisor supported"
|
|
||||||
test=$(docker exec hassio_cli ha resolution info --no-progress --raw-json | jq -r '.data.unsupported[]')
|
|
||||||
if [[ "$test" =~ source_mods ]]; then
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
- name: Create full backup
|
- name: Create full backup
|
||||||
id: backup
|
id: backup
|
||||||
run: |
|
run: |
|
||||||
|
|||||||
@@ -1513,13 +1513,6 @@ class Addon(AddonModel):
|
|||||||
_LOGGER.info("Finished restore for add-on %s", self.slug)
|
_LOGGER.info("Finished restore for add-on %s", self.slug)
|
||||||
return wait_for_start
|
return wait_for_start
|
||||||
|
|
||||||
def check_trust(self) -> Awaitable[None]:
|
|
||||||
"""Calculate Addon docker content trust.
|
|
||||||
|
|
||||||
Return Coroutine.
|
|
||||||
"""
|
|
||||||
return self.instance.check_trust()
|
|
||||||
|
|
||||||
@Job(
|
@Job(
|
||||||
name="addon_restart_after_problem",
|
name="addon_restart_after_problem",
|
||||||
throttle_period=WATCHDOG_THROTTLE_PERIOD,
|
throttle_period=WATCHDOG_THROTTLE_PERIOD,
|
||||||
|
|||||||
@@ -103,7 +103,6 @@ from .configuration import FolderMapping
|
|||||||
from .const import (
|
from .const import (
|
||||||
ATTR_BACKUP,
|
ATTR_BACKUP,
|
||||||
ATTR_BREAKING_VERSIONS,
|
ATTR_BREAKING_VERSIONS,
|
||||||
ATTR_CODENOTARY,
|
|
||||||
ATTR_PATH,
|
ATTR_PATH,
|
||||||
ATTR_READ_ONLY,
|
ATTR_READ_ONLY,
|
||||||
AddonBackupMode,
|
AddonBackupMode,
|
||||||
@@ -632,13 +631,8 @@ class AddonModel(JobGroup, ABC):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def signed(self) -> bool:
|
def signed(self) -> bool:
|
||||||
"""Return True if the image is signed."""
|
"""Currently no signing support."""
|
||||||
return ATTR_CODENOTARY in self.data
|
return False
|
||||||
|
|
||||||
@property
|
|
||||||
def codenotary(self) -> str | None:
|
|
||||||
"""Return Signer email address for CAS."""
|
|
||||||
return self.data.get(ATTR_CODENOTARY)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def breaking_versions(self) -> list[AwesomeVersion]:
|
def breaking_versions(self) -> list[AwesomeVersion]:
|
||||||
|
|||||||
@@ -207,6 +207,12 @@ def _warn_addon_config(config: dict[str, Any]):
|
|||||||
name,
|
name,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if ATTR_CODENOTARY in config:
|
||||||
|
_LOGGER.warning(
|
||||||
|
"Add-on '%s' uses deprecated 'codenotary' field in config. This field is no longer used and will be ignored. Please report this to the maintainer.",
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
|
||||||
return config
|
return config
|
||||||
|
|
||||||
|
|
||||||
@@ -417,7 +423,6 @@ _SCHEMA_ADDON_CONFIG = vol.Schema(
|
|||||||
vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce(
|
vol.Optional(ATTR_BACKUP, default=AddonBackupMode.HOT): vol.Coerce(
|
||||||
AddonBackupMode
|
AddonBackupMode
|
||||||
),
|
),
|
||||||
vol.Optional(ATTR_CODENOTARY): vol.Email(),
|
|
||||||
vol.Optional(ATTR_OPTIONS, default={}): dict,
|
vol.Optional(ATTR_OPTIONS, default={}): dict,
|
||||||
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
|
vol.Optional(ATTR_SCHEMA, default={}): vol.Any(
|
||||||
vol.Schema({str: SCHEMA_ELEMENT}),
|
vol.Schema({str: SCHEMA_ELEMENT}),
|
||||||
|
|||||||
@@ -1,24 +1,20 @@
|
|||||||
"""Init file for Supervisor Security RESTful API."""
|
"""Init file for Supervisor Security RESTful API."""
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
import attr
|
|
||||||
import voluptuous as vol
|
import voluptuous as vol
|
||||||
|
|
||||||
from ..const import ATTR_CONTENT_TRUST, ATTR_FORCE_SECURITY, ATTR_PWNED
|
from supervisor.exceptions import APIGone
|
||||||
|
|
||||||
|
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED
|
||||||
from ..coresys import CoreSysAttributes
|
from ..coresys import CoreSysAttributes
|
||||||
from .utils import api_process, api_validate
|
from .utils import api_process, api_validate
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# pylint: disable=no-value-for-parameter
|
# pylint: disable=no-value-for-parameter
|
||||||
SCHEMA_OPTIONS = vol.Schema(
|
SCHEMA_OPTIONS = vol.Schema(
|
||||||
{
|
{
|
||||||
vol.Optional(ATTR_PWNED): vol.Boolean(),
|
vol.Optional(ATTR_PWNED): vol.Boolean(),
|
||||||
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
|
|
||||||
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
|
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
@@ -31,7 +27,6 @@ class APISecurity(CoreSysAttributes):
|
|||||||
async def info(self, request: web.Request) -> dict[str, Any]:
|
async def info(self, request: web.Request) -> dict[str, Any]:
|
||||||
"""Return Security information."""
|
"""Return Security information."""
|
||||||
return {
|
return {
|
||||||
ATTR_CONTENT_TRUST: self.sys_security.content_trust,
|
|
||||||
ATTR_PWNED: self.sys_security.pwned,
|
ATTR_PWNED: self.sys_security.pwned,
|
||||||
ATTR_FORCE_SECURITY: self.sys_security.force,
|
ATTR_FORCE_SECURITY: self.sys_security.force,
|
||||||
}
|
}
|
||||||
@@ -43,8 +38,6 @@ class APISecurity(CoreSysAttributes):
|
|||||||
|
|
||||||
if ATTR_PWNED in body:
|
if ATTR_PWNED in body:
|
||||||
self.sys_security.pwned = body[ATTR_PWNED]
|
self.sys_security.pwned = body[ATTR_PWNED]
|
||||||
if ATTR_CONTENT_TRUST in body:
|
|
||||||
self.sys_security.content_trust = body[ATTR_CONTENT_TRUST]
|
|
||||||
if ATTR_FORCE_SECURITY in body:
|
if ATTR_FORCE_SECURITY in body:
|
||||||
self.sys_security.force = body[ATTR_FORCE_SECURITY]
|
self.sys_security.force = body[ATTR_FORCE_SECURITY]
|
||||||
|
|
||||||
@@ -54,6 +47,9 @@ class APISecurity(CoreSysAttributes):
|
|||||||
|
|
||||||
@api_process
|
@api_process
|
||||||
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
|
async def integrity_check(self, request: web.Request) -> dict[str, Any]:
|
||||||
"""Run backend integrity check."""
|
"""Run backend integrity check.
|
||||||
result = await asyncio.shield(self.sys_security.integrity_check())
|
|
||||||
return attr.asdict(result)
|
CodeNotary integrity checking has been removed. This endpoint now returns
|
||||||
|
an error indicating the feature is gone.
|
||||||
|
"""
|
||||||
|
raise APIGone("Integrity check feature has been removed.")
|
||||||
|
|||||||
@@ -16,14 +16,12 @@ from ..const import (
|
|||||||
ATTR_BLK_READ,
|
ATTR_BLK_READ,
|
||||||
ATTR_BLK_WRITE,
|
ATTR_BLK_WRITE,
|
||||||
ATTR_CHANNEL,
|
ATTR_CHANNEL,
|
||||||
ATTR_CONTENT_TRUST,
|
|
||||||
ATTR_COUNTRY,
|
ATTR_COUNTRY,
|
||||||
ATTR_CPU_PERCENT,
|
ATTR_CPU_PERCENT,
|
||||||
ATTR_DEBUG,
|
ATTR_DEBUG,
|
||||||
ATTR_DEBUG_BLOCK,
|
ATTR_DEBUG_BLOCK,
|
||||||
ATTR_DETECT_BLOCKING_IO,
|
ATTR_DETECT_BLOCKING_IO,
|
||||||
ATTR_DIAGNOSTICS,
|
ATTR_DIAGNOSTICS,
|
||||||
ATTR_FORCE_SECURITY,
|
|
||||||
ATTR_HEALTHY,
|
ATTR_HEALTHY,
|
||||||
ATTR_ICON,
|
ATTR_ICON,
|
||||||
ATTR_IP_ADDRESS,
|
ATTR_IP_ADDRESS,
|
||||||
@@ -69,8 +67,6 @@ SCHEMA_OPTIONS = vol.Schema(
|
|||||||
vol.Optional(ATTR_DEBUG): vol.Boolean(),
|
vol.Optional(ATTR_DEBUG): vol.Boolean(),
|
||||||
vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(),
|
vol.Optional(ATTR_DEBUG_BLOCK): vol.Boolean(),
|
||||||
vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(),
|
vol.Optional(ATTR_DIAGNOSTICS): vol.Boolean(),
|
||||||
vol.Optional(ATTR_CONTENT_TRUST): vol.Boolean(),
|
|
||||||
vol.Optional(ATTR_FORCE_SECURITY): vol.Boolean(),
|
|
||||||
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
|
vol.Optional(ATTR_AUTO_UPDATE): vol.Boolean(),
|
||||||
vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO),
|
vol.Optional(ATTR_DETECT_BLOCKING_IO): vol.Coerce(DetectBlockingIO),
|
||||||
vol.Optional(ATTR_COUNTRY): str,
|
vol.Optional(ATTR_COUNTRY): str,
|
||||||
|
|||||||
@@ -105,7 +105,6 @@ async def initialize_coresys() -> CoreSys:
|
|||||||
|
|
||||||
if coresys.dev:
|
if coresys.dev:
|
||||||
coresys.updater.channel = UpdateChannel.DEV
|
coresys.updater.channel = UpdateChannel.DEV
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
# Convert datetime
|
# Convert datetime
|
||||||
logging.Formatter.converter = lambda *args: coresys.now().timetuple()
|
logging.Formatter.converter = lambda *args: coresys.now().timetuple()
|
||||||
|
|||||||
@@ -846,16 +846,6 @@ class DockerAddon(DockerInterface):
|
|||||||
):
|
):
|
||||||
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
|
self.sys_resolution.dismiss_issue(self.addon.device_access_missing_issue)
|
||||||
|
|
||||||
async def _validate_trust(self, image_id: str) -> None:
|
|
||||||
"""Validate trust of content."""
|
|
||||||
if not self.addon.signed:
|
|
||||||
return
|
|
||||||
|
|
||||||
checksum = image_id.partition(":")[2]
|
|
||||||
return await self.sys_security.verify_content(
|
|
||||||
cast(str, self.addon.codenotary), checksum
|
|
||||||
)
|
|
||||||
|
|
||||||
@Job(
|
@Job(
|
||||||
name="docker_addon_hardware_events",
|
name="docker_addon_hardware_events",
|
||||||
conditions=[JobCondition.OS_AGENT],
|
conditions=[JobCondition.OS_AGENT],
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from ipaddress import IPv4Address
|
|||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from awesomeversion import AwesomeVersion, AwesomeVersionCompareException
|
from awesomeversion import AwesomeVersion
|
||||||
from docker.types import Mount
|
from docker.types import Mount
|
||||||
|
|
||||||
from ..const import LABEL_MACHINE
|
from ..const import LABEL_MACHINE
|
||||||
@@ -244,13 +244,3 @@ class DockerHomeAssistant(DockerInterface):
|
|||||||
self.image,
|
self.image,
|
||||||
self.sys_homeassistant.version,
|
self.sys_homeassistant.version,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _validate_trust(self, image_id: str) -> None:
|
|
||||||
"""Validate trust of content."""
|
|
||||||
try:
|
|
||||||
if self.version in {None, LANDINGPAGE} or self.version < _VERIFY_TRUST:
|
|
||||||
return
|
|
||||||
except AwesomeVersionCompareException:
|
|
||||||
return
|
|
||||||
|
|
||||||
await super()._validate_trust(image_id)
|
|
||||||
|
|||||||
@@ -31,15 +31,12 @@ from ..const import (
|
|||||||
)
|
)
|
||||||
from ..coresys import CoreSys
|
from ..coresys import CoreSys
|
||||||
from ..exceptions import (
|
from ..exceptions import (
|
||||||
CodeNotaryError,
|
|
||||||
CodeNotaryUntrusted,
|
|
||||||
DockerAPIError,
|
DockerAPIError,
|
||||||
DockerError,
|
DockerError,
|
||||||
DockerJobError,
|
DockerJobError,
|
||||||
DockerLogOutOfOrder,
|
DockerLogOutOfOrder,
|
||||||
DockerNotFound,
|
DockerNotFound,
|
||||||
DockerRequestError,
|
DockerRequestError,
|
||||||
DockerTrustError,
|
|
||||||
)
|
)
|
||||||
from ..jobs import SupervisorJob
|
from ..jobs import SupervisorJob
|
||||||
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
|
from ..jobs.const import JOB_GROUP_DOCKER_INTERFACE, JobConcurrency
|
||||||
@@ -425,18 +422,6 @@ class DockerInterface(JobGroup, ABC):
|
|||||||
platform=MAP_ARCH[image_arch],
|
platform=MAP_ARCH[image_arch],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Validate content
|
|
||||||
try:
|
|
||||||
await self._validate_trust(cast(str, docker_image.id))
|
|
||||||
except CodeNotaryError:
|
|
||||||
with suppress(docker.errors.DockerException):
|
|
||||||
await self.sys_run_in_executor(
|
|
||||||
self.sys_docker.images.remove,
|
|
||||||
image=f"{image}:{version!s}",
|
|
||||||
force=True,
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
# Tag latest
|
# Tag latest
|
||||||
if latest:
|
if latest:
|
||||||
_LOGGER.info(
|
_LOGGER.info(
|
||||||
@@ -462,16 +447,6 @@ class DockerInterface(JobGroup, ABC):
|
|||||||
raise DockerError(
|
raise DockerError(
|
||||||
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
|
f"Unknown error with {image}:{version!s} -> {err!s}", _LOGGER.error
|
||||||
) from err
|
) from err
|
||||||
except CodeNotaryUntrusted as err:
|
|
||||||
raise DockerTrustError(
|
|
||||||
f"Pulled image {image}:{version!s} failed on content-trust verification!",
|
|
||||||
_LOGGER.critical,
|
|
||||||
) from err
|
|
||||||
except CodeNotaryError as err:
|
|
||||||
raise DockerTrustError(
|
|
||||||
f"Error happened on Content-Trust check for {image}:{version!s}: {err!s}",
|
|
||||||
_LOGGER.error,
|
|
||||||
) from err
|
|
||||||
finally:
|
finally:
|
||||||
if listener:
|
if listener:
|
||||||
self.sys_bus.remove_listener(listener)
|
self.sys_bus.remove_listener(listener)
|
||||||
@@ -809,24 +784,3 @@ class DockerInterface(JobGroup, ABC):
|
|||||||
return self.sys_run_in_executor(
|
return self.sys_run_in_executor(
|
||||||
self.sys_docker.container_run_inside, self.name, command
|
self.sys_docker.container_run_inside, self.name, command
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _validate_trust(self, image_id: str) -> None:
|
|
||||||
"""Validate trust of content."""
|
|
||||||
checksum = image_id.partition(":")[2]
|
|
||||||
return await self.sys_security.verify_own_content(checksum)
|
|
||||||
|
|
||||||
@Job(
|
|
||||||
name="docker_interface_check_trust",
|
|
||||||
on_condition=DockerJobError,
|
|
||||||
concurrency=JobConcurrency.GROUP_REJECT,
|
|
||||||
)
|
|
||||||
async def check_trust(self) -> None:
|
|
||||||
"""Check trust of exists Docker image."""
|
|
||||||
try:
|
|
||||||
image = await self.sys_run_in_executor(
|
|
||||||
self.sys_docker.images.get, f"{self.image}:{self.version!s}"
|
|
||||||
)
|
|
||||||
except (docker.errors.DockerException, requests.RequestException):
|
|
||||||
return
|
|
||||||
|
|
||||||
await self._validate_trust(cast(str, image.id))
|
|
||||||
|
|||||||
@@ -423,6 +423,12 @@ class APINotFound(APIError):
|
|||||||
status = 404
|
status = 404
|
||||||
|
|
||||||
|
|
||||||
|
class APIGone(APIError):
|
||||||
|
"""API is no longer available."""
|
||||||
|
|
||||||
|
status = 410
|
||||||
|
|
||||||
|
|
||||||
class APIAddonNotInstalled(APIError):
|
class APIAddonNotInstalled(APIError):
|
||||||
"""Not installed addon requested at addons API."""
|
"""Not installed addon requested at addons API."""
|
||||||
|
|
||||||
@@ -577,21 +583,6 @@ class PwnedConnectivityError(PwnedError):
|
|||||||
"""Connectivity errors while checking pwned passwords."""
|
"""Connectivity errors while checking pwned passwords."""
|
||||||
|
|
||||||
|
|
||||||
# util/codenotary
|
|
||||||
|
|
||||||
|
|
||||||
class CodeNotaryError(HassioError):
|
|
||||||
"""Error general with CodeNotary."""
|
|
||||||
|
|
||||||
|
|
||||||
class CodeNotaryUntrusted(CodeNotaryError):
|
|
||||||
"""Error on untrusted content."""
|
|
||||||
|
|
||||||
|
|
||||||
class CodeNotaryBackendError(CodeNotaryError):
|
|
||||||
"""CodeNotary backend error happening."""
|
|
||||||
|
|
||||||
|
|
||||||
# util/whoami
|
# util/whoami
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -428,13 +428,6 @@ class HomeAssistantCore(JobGroup):
|
|||||||
"""
|
"""
|
||||||
return self.instance.logs()
|
return self.instance.logs()
|
||||||
|
|
||||||
def check_trust(self) -> Awaitable[None]:
|
|
||||||
"""Calculate HomeAssistant docker content trust.
|
|
||||||
|
|
||||||
Return Coroutine.
|
|
||||||
"""
|
|
||||||
return self.instance.check_trust()
|
|
||||||
|
|
||||||
async def stats(self) -> DockerStats:
|
async def stats(self) -> DockerStats:
|
||||||
"""Return stats of Home Assistant."""
|
"""Return stats of Home Assistant."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -76,13 +76,6 @@ class PluginBase(ABC, FileConfiguration, CoreSysAttributes):
|
|||||||
"""Return True if a task is in progress."""
|
"""Return True if a task is in progress."""
|
||||||
return self.instance.in_progress
|
return self.instance.in_progress
|
||||||
|
|
||||||
def check_trust(self) -> Awaitable[None]:
|
|
||||||
"""Calculate plugin docker content trust.
|
|
||||||
|
|
||||||
Return Coroutine.
|
|
||||||
"""
|
|
||||||
return self.instance.check_trust()
|
|
||||||
|
|
||||||
def logs(self) -> Awaitable[bytes]:
|
def logs(self) -> Awaitable[bytes]:
|
||||||
"""Get docker plugin logs.
|
"""Get docker plugin logs.
|
||||||
|
|
||||||
|
|||||||
@@ -1,59 +0,0 @@
|
|||||||
"""Helpers to check supervisor trust."""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from ...const import CoreState
|
|
||||||
from ...coresys import CoreSys
|
|
||||||
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from ..const import ContextType, IssueType, UnhealthyReason
|
|
||||||
from .base import CheckBase
|
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def setup(coresys: CoreSys) -> CheckBase:
|
|
||||||
"""Check setup function."""
|
|
||||||
return CheckSupervisorTrust(coresys)
|
|
||||||
|
|
||||||
|
|
||||||
class CheckSupervisorTrust(CheckBase):
|
|
||||||
"""CheckSystemTrust class for check."""
|
|
||||||
|
|
||||||
async def run_check(self) -> None:
|
|
||||||
"""Run check if not affected by issue."""
|
|
||||||
if not self.sys_security.content_trust:
|
|
||||||
_LOGGER.warning(
|
|
||||||
"Skipping %s, content_trust is globally disabled", self.slug
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self.sys_supervisor.check_trust()
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
self.sys_resolution.add_unhealthy_reason(UnhealthyReason.UNTRUSTED)
|
|
||||||
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
|
|
||||||
except CodeNotaryError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def approve_check(self, reference: str | None = None) -> bool:
|
|
||||||
"""Approve check if it is affected by issue."""
|
|
||||||
try:
|
|
||||||
await self.sys_supervisor.check_trust()
|
|
||||||
except CodeNotaryError:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def issue(self) -> IssueType:
|
|
||||||
"""Return a IssueType enum."""
|
|
||||||
return IssueType.TRUST
|
|
||||||
|
|
||||||
@property
|
|
||||||
def context(self) -> ContextType:
|
|
||||||
"""Return a ContextType enum."""
|
|
||||||
return ContextType.SUPERVISOR
|
|
||||||
|
|
||||||
@property
|
|
||||||
def states(self) -> list[CoreState]:
|
|
||||||
"""Return a list of valid states when this check can run."""
|
|
||||||
return [CoreState.RUNNING, CoreState.STARTUP]
|
|
||||||
@@ -39,7 +39,6 @@ class UnsupportedReason(StrEnum):
|
|||||||
APPARMOR = "apparmor"
|
APPARMOR = "apparmor"
|
||||||
CGROUP_VERSION = "cgroup_version"
|
CGROUP_VERSION = "cgroup_version"
|
||||||
CONNECTIVITY_CHECK = "connectivity_check"
|
CONNECTIVITY_CHECK = "connectivity_check"
|
||||||
CONTENT_TRUST = "content_trust"
|
|
||||||
DBUS = "dbus"
|
DBUS = "dbus"
|
||||||
DNS_SERVER = "dns_server"
|
DNS_SERVER = "dns_server"
|
||||||
DOCKER_CONFIGURATION = "docker_configuration"
|
DOCKER_CONFIGURATION = "docker_configuration"
|
||||||
@@ -54,7 +53,6 @@ class UnsupportedReason(StrEnum):
|
|||||||
PRIVILEGED = "privileged"
|
PRIVILEGED = "privileged"
|
||||||
RESTART_POLICY = "restart_policy"
|
RESTART_POLICY = "restart_policy"
|
||||||
SOFTWARE = "software"
|
SOFTWARE = "software"
|
||||||
SOURCE_MODS = "source_mods"
|
|
||||||
SUPERVISOR_VERSION = "supervisor_version"
|
SUPERVISOR_VERSION = "supervisor_version"
|
||||||
SYSTEMD = "systemd"
|
SYSTEMD = "systemd"
|
||||||
SYSTEMD_JOURNAL = "systemd_journal"
|
SYSTEMD_JOURNAL = "systemd_journal"
|
||||||
@@ -103,7 +101,6 @@ class IssueType(StrEnum):
|
|||||||
PWNED = "pwned"
|
PWNED = "pwned"
|
||||||
REBOOT_REQUIRED = "reboot_required"
|
REBOOT_REQUIRED = "reboot_required"
|
||||||
SECURITY = "security"
|
SECURITY = "security"
|
||||||
TRUST = "trust"
|
|
||||||
UPDATE_FAILED = "update_failed"
|
UPDATE_FAILED = "update_failed"
|
||||||
UPDATE_ROLLBACK = "update_rollback"
|
UPDATE_ROLLBACK = "update_rollback"
|
||||||
|
|
||||||
@@ -115,7 +112,6 @@ class SuggestionType(StrEnum):
|
|||||||
CLEAR_FULL_BACKUP = "clear_full_backup"
|
CLEAR_FULL_BACKUP = "clear_full_backup"
|
||||||
CREATE_FULL_BACKUP = "create_full_backup"
|
CREATE_FULL_BACKUP = "create_full_backup"
|
||||||
DISABLE_BOOT = "disable_boot"
|
DISABLE_BOOT = "disable_boot"
|
||||||
EXECUTE_INTEGRITY = "execute_integrity"
|
|
||||||
EXECUTE_REBOOT = "execute_reboot"
|
EXECUTE_REBOOT = "execute_reboot"
|
||||||
EXECUTE_REBUILD = "execute_rebuild"
|
EXECUTE_REBUILD = "execute_rebuild"
|
||||||
EXECUTE_RELOAD = "execute_reload"
|
EXECUTE_RELOAD = "execute_reload"
|
||||||
|
|||||||
@@ -1,34 +0,0 @@
|
|||||||
"""Evaluation class for Content Trust."""
|
|
||||||
|
|
||||||
from ...const import CoreState
|
|
||||||
from ...coresys import CoreSys
|
|
||||||
from ..const import UnsupportedReason
|
|
||||||
from .base import EvaluateBase
|
|
||||||
|
|
||||||
|
|
||||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
|
||||||
"""Initialize evaluation-setup function."""
|
|
||||||
return EvaluateContentTrust(coresys)
|
|
||||||
|
|
||||||
|
|
||||||
class EvaluateContentTrust(EvaluateBase):
|
|
||||||
"""Evaluate system content trust level."""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> UnsupportedReason:
|
|
||||||
"""Return a UnsupportedReason enum."""
|
|
||||||
return UnsupportedReason.CONTENT_TRUST
|
|
||||||
|
|
||||||
@property
|
|
||||||
def on_failure(self) -> str:
|
|
||||||
"""Return a string that is printed when self.evaluate is True."""
|
|
||||||
return "System run with disabled trusted content security."
|
|
||||||
|
|
||||||
@property
|
|
||||||
def states(self) -> list[CoreState]:
|
|
||||||
"""Return a list of valid states when this evaluation can run."""
|
|
||||||
return [CoreState.INITIALIZE, CoreState.SETUP, CoreState.RUNNING]
|
|
||||||
|
|
||||||
async def evaluate(self) -> bool:
|
|
||||||
"""Run evaluation."""
|
|
||||||
return not self.sys_security.content_trust
|
|
||||||
@@ -1,72 +0,0 @@
|
|||||||
"""Evaluation class for Content Trust."""
|
|
||||||
|
|
||||||
import errno
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from ...const import CoreState
|
|
||||||
from ...coresys import CoreSys
|
|
||||||
from ...exceptions import CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from ...utils.codenotary import calc_checksum_path_sourcecode
|
|
||||||
from ..const import ContextType, IssueType, UnhealthyReason, UnsupportedReason
|
|
||||||
from .base import EvaluateBase
|
|
||||||
|
|
||||||
_SUPERVISOR_SOURCE = Path("/usr/src/supervisor/supervisor")
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def setup(coresys: CoreSys) -> EvaluateBase:
|
|
||||||
"""Initialize evaluation-setup function."""
|
|
||||||
return EvaluateSourceMods(coresys)
|
|
||||||
|
|
||||||
|
|
||||||
class EvaluateSourceMods(EvaluateBase):
|
|
||||||
"""Evaluate supervisor source modifications."""
|
|
||||||
|
|
||||||
@property
|
|
||||||
def reason(self) -> UnsupportedReason:
|
|
||||||
"""Return a UnsupportedReason enum."""
|
|
||||||
return UnsupportedReason.SOURCE_MODS
|
|
||||||
|
|
||||||
@property
|
|
||||||
def on_failure(self) -> str:
|
|
||||||
"""Return a string that is printed when self.evaluate is True."""
|
|
||||||
return "System detect unauthorized source code modifications."
|
|
||||||
|
|
||||||
@property
|
|
||||||
def states(self) -> list[CoreState]:
|
|
||||||
"""Return a list of valid states when this evaluation can run."""
|
|
||||||
return [CoreState.RUNNING]
|
|
||||||
|
|
||||||
async def evaluate(self) -> bool:
|
|
||||||
"""Run evaluation."""
|
|
||||||
if not self.sys_security.content_trust:
|
|
||||||
_LOGGER.warning("Disabled content-trust, skipping evaluation")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Calculate sume of the sourcecode
|
|
||||||
try:
|
|
||||||
checksum = await self.sys_run_in_executor(
|
|
||||||
calc_checksum_path_sourcecode, _SUPERVISOR_SOURCE
|
|
||||||
)
|
|
||||||
except OSError as err:
|
|
||||||
if err.errno == errno.EBADMSG:
|
|
||||||
self.sys_resolution.add_unhealthy_reason(
|
|
||||||
UnhealthyReason.OSERROR_BAD_MESSAGE
|
|
||||||
)
|
|
||||||
|
|
||||||
self.sys_resolution.create_issue(
|
|
||||||
IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM
|
|
||||||
)
|
|
||||||
_LOGGER.error("Can't calculate checksum of source code: %s", err)
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Validate checksum
|
|
||||||
try:
|
|
||||||
await self.sys_security.verify_own_content(checksum)
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
return True
|
|
||||||
except CodeNotaryError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return False
|
|
||||||
@@ -1,67 +0,0 @@
|
|||||||
"""Helpers to check and fix issues with free space."""
|
|
||||||
|
|
||||||
from datetime import timedelta
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from ...coresys import CoreSys
|
|
||||||
from ...exceptions import ResolutionFixupError, ResolutionFixupJobError
|
|
||||||
from ...jobs.const import JobCondition, JobThrottle
|
|
||||||
from ...jobs.decorator import Job
|
|
||||||
from ...security.const import ContentTrustResult
|
|
||||||
from ..const import ContextType, IssueType, SuggestionType
|
|
||||||
from .base import FixupBase
|
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
def setup(coresys: CoreSys) -> FixupBase:
|
|
||||||
"""Check setup function."""
|
|
||||||
return FixupSystemExecuteIntegrity(coresys)
|
|
||||||
|
|
||||||
|
|
||||||
class FixupSystemExecuteIntegrity(FixupBase):
|
|
||||||
"""Storage class for fixup."""
|
|
||||||
|
|
||||||
@Job(
|
|
||||||
name="fixup_system_execute_integrity_process",
|
|
||||||
conditions=[JobCondition.INTERNET_SYSTEM],
|
|
||||||
on_condition=ResolutionFixupJobError,
|
|
||||||
throttle_period=timedelta(hours=8),
|
|
||||||
throttle=JobThrottle.THROTTLE,
|
|
||||||
)
|
|
||||||
async def process_fixup(self, reference: str | None = None) -> None:
|
|
||||||
"""Initialize the fixup class."""
|
|
||||||
result = await self.sys_security.integrity_check()
|
|
||||||
|
|
||||||
if ContentTrustResult.FAILED in (result.core, result.supervisor):
|
|
||||||
raise ResolutionFixupError()
|
|
||||||
|
|
||||||
for plugin in result.plugins:
|
|
||||||
if plugin != ContentTrustResult.FAILED:
|
|
||||||
continue
|
|
||||||
raise ResolutionFixupError()
|
|
||||||
|
|
||||||
for addon in result.addons:
|
|
||||||
if addon != ContentTrustResult.FAILED:
|
|
||||||
continue
|
|
||||||
raise ResolutionFixupError()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def suggestion(self) -> SuggestionType:
|
|
||||||
"""Return a SuggestionType enum."""
|
|
||||||
return SuggestionType.EXECUTE_INTEGRITY
|
|
||||||
|
|
||||||
@property
|
|
||||||
def context(self) -> ContextType:
|
|
||||||
"""Return a ContextType enum."""
|
|
||||||
return ContextType.SYSTEM
|
|
||||||
|
|
||||||
@property
|
|
||||||
def issues(self) -> list[IssueType]:
|
|
||||||
"""Return a IssueType enum list."""
|
|
||||||
return [IssueType.TRUST]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def auto(self) -> bool:
|
|
||||||
"""Return if a fixup can be apply as auto fix."""
|
|
||||||
return True
|
|
||||||
@@ -1,24 +0,0 @@
|
|||||||
"""Security constants."""
|
|
||||||
|
|
||||||
from enum import StrEnum
|
|
||||||
|
|
||||||
import attr
|
|
||||||
|
|
||||||
|
|
||||||
class ContentTrustResult(StrEnum):
|
|
||||||
"""Content trust result enum."""
|
|
||||||
|
|
||||||
PASS = "pass"
|
|
||||||
ERROR = "error"
|
|
||||||
FAILED = "failed"
|
|
||||||
UNTESTED = "untested"
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
|
||||||
class IntegrityResult:
|
|
||||||
"""Result of a full integrity check."""
|
|
||||||
|
|
||||||
supervisor: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
|
|
||||||
core: ContentTrustResult = attr.ib(default=ContentTrustResult.UNTESTED)
|
|
||||||
plugins: dict[str, ContentTrustResult] = attr.ib(default={})
|
|
||||||
addons: dict[str, ContentTrustResult] = attr.ib(default={})
|
|
||||||
@@ -4,27 +4,12 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from ..const import (
|
from ..const import ATTR_FORCE_SECURITY, ATTR_PWNED, FILE_HASSIO_SECURITY
|
||||||
ATTR_CONTENT_TRUST,
|
|
||||||
ATTR_FORCE_SECURITY,
|
|
||||||
ATTR_PWNED,
|
|
||||||
FILE_HASSIO_SECURITY,
|
|
||||||
)
|
|
||||||
from ..coresys import CoreSys, CoreSysAttributes
|
from ..coresys import CoreSys, CoreSysAttributes
|
||||||
from ..exceptions import (
|
from ..exceptions import PwnedError
|
||||||
CodeNotaryError,
|
|
||||||
CodeNotaryUntrusted,
|
|
||||||
PwnedError,
|
|
||||||
SecurityJobError,
|
|
||||||
)
|
|
||||||
from ..jobs.const import JobConcurrency
|
|
||||||
from ..jobs.decorator import Job, JobCondition
|
|
||||||
from ..resolution.const import ContextType, IssueType, SuggestionType
|
|
||||||
from ..utils.codenotary import cas_validate
|
|
||||||
from ..utils.common import FileConfiguration
|
from ..utils.common import FileConfiguration
|
||||||
from ..utils.pwned import check_pwned_password
|
from ..utils.pwned import check_pwned_password
|
||||||
from ..validate import SCHEMA_SECURITY_CONFIG
|
from ..validate import SCHEMA_SECURITY_CONFIG
|
||||||
from .const import ContentTrustResult, IntegrityResult
|
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -37,16 +22,6 @@ class Security(FileConfiguration, CoreSysAttributes):
|
|||||||
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
|
super().__init__(FILE_HASSIO_SECURITY, SCHEMA_SECURITY_CONFIG)
|
||||||
self.coresys = coresys
|
self.coresys = coresys
|
||||||
|
|
||||||
@property
|
|
||||||
def content_trust(self) -> bool:
|
|
||||||
"""Return if content trust is enabled/disabled."""
|
|
||||||
return self._data[ATTR_CONTENT_TRUST]
|
|
||||||
|
|
||||||
@content_trust.setter
|
|
||||||
def content_trust(self, value: bool) -> None:
|
|
||||||
"""Set content trust is enabled/disabled."""
|
|
||||||
self._data[ATTR_CONTENT_TRUST] = value
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def force(self) -> bool:
|
def force(self) -> bool:
|
||||||
"""Return if force security is enabled/disabled."""
|
"""Return if force security is enabled/disabled."""
|
||||||
@@ -67,30 +42,6 @@ class Security(FileConfiguration, CoreSysAttributes):
|
|||||||
"""Set pwned is enabled/disabled."""
|
"""Set pwned is enabled/disabled."""
|
||||||
self._data[ATTR_PWNED] = value
|
self._data[ATTR_PWNED] = value
|
||||||
|
|
||||||
async def verify_content(self, signer: str, checksum: str) -> None:
|
|
||||||
"""Verify content on CAS."""
|
|
||||||
if not self.content_trust:
|
|
||||||
_LOGGER.warning("Disabled content-trust, skip validation")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await cas_validate(signer, checksum)
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
raise
|
|
||||||
except CodeNotaryError:
|
|
||||||
if self.force:
|
|
||||||
raise
|
|
||||||
self.sys_resolution.create_issue(
|
|
||||||
IssueType.TRUST,
|
|
||||||
ContextType.SYSTEM,
|
|
||||||
suggestions=[SuggestionType.EXECUTE_INTEGRITY],
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
async def verify_own_content(self, checksum: str) -> None:
|
|
||||||
"""Verify content from HA org."""
|
|
||||||
return await self.verify_content("notary@home-assistant.io", checksum)
|
|
||||||
|
|
||||||
async def verify_secret(self, pwned_hash: str) -> None:
|
async def verify_secret(self, pwned_hash: str) -> None:
|
||||||
"""Verify pwned state of a secret."""
|
"""Verify pwned state of a secret."""
|
||||||
if not self.pwned:
|
if not self.pwned:
|
||||||
@@ -103,73 +54,3 @@ class Security(FileConfiguration, CoreSysAttributes):
|
|||||||
if self.force:
|
if self.force:
|
||||||
raise
|
raise
|
||||||
return
|
return
|
||||||
|
|
||||||
@Job(
|
|
||||||
name="security_manager_integrity_check",
|
|
||||||
conditions=[JobCondition.INTERNET_SYSTEM],
|
|
||||||
on_condition=SecurityJobError,
|
|
||||||
concurrency=JobConcurrency.REJECT,
|
|
||||||
)
|
|
||||||
async def integrity_check(self) -> IntegrityResult:
|
|
||||||
"""Run a full system integrity check of the platform.
|
|
||||||
|
|
||||||
We only allow to install trusted content.
|
|
||||||
This is a out of the band manual check.
|
|
||||||
"""
|
|
||||||
result: IntegrityResult = IntegrityResult()
|
|
||||||
if not self.content_trust:
|
|
||||||
_LOGGER.warning(
|
|
||||||
"Skipping integrity check, content_trust is globally disabled"
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
# Supervisor
|
|
||||||
try:
|
|
||||||
await self.sys_supervisor.check_trust()
|
|
||||||
result.supervisor = ContentTrustResult.PASS
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
result.supervisor = ContentTrustResult.ERROR
|
|
||||||
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.SUPERVISOR)
|
|
||||||
except CodeNotaryError:
|
|
||||||
result.supervisor = ContentTrustResult.FAILED
|
|
||||||
|
|
||||||
# Core
|
|
||||||
try:
|
|
||||||
await self.sys_homeassistant.core.check_trust()
|
|
||||||
result.core = ContentTrustResult.PASS
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
result.core = ContentTrustResult.ERROR
|
|
||||||
self.sys_resolution.create_issue(IssueType.TRUST, ContextType.CORE)
|
|
||||||
except CodeNotaryError:
|
|
||||||
result.core = ContentTrustResult.FAILED
|
|
||||||
|
|
||||||
# Plugins
|
|
||||||
for plugin in self.sys_plugins.all_plugins:
|
|
||||||
try:
|
|
||||||
await plugin.check_trust()
|
|
||||||
result.plugins[plugin.slug] = ContentTrustResult.PASS
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
result.plugins[plugin.slug] = ContentTrustResult.ERROR
|
|
||||||
self.sys_resolution.create_issue(
|
|
||||||
IssueType.TRUST, ContextType.PLUGIN, reference=plugin.slug
|
|
||||||
)
|
|
||||||
except CodeNotaryError:
|
|
||||||
result.plugins[plugin.slug] = ContentTrustResult.FAILED
|
|
||||||
|
|
||||||
# Add-ons
|
|
||||||
for addon in self.sys_addons.installed:
|
|
||||||
if not addon.signed:
|
|
||||||
result.addons[addon.slug] = ContentTrustResult.UNTESTED
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
await addon.check_trust()
|
|
||||||
result.addons[addon.slug] = ContentTrustResult.PASS
|
|
||||||
except CodeNotaryUntrusted:
|
|
||||||
result.addons[addon.slug] = ContentTrustResult.ERROR
|
|
||||||
self.sys_resolution.create_issue(
|
|
||||||
IssueType.TRUST, ContextType.ADDON, reference=addon.slug
|
|
||||||
)
|
|
||||||
except CodeNotaryError:
|
|
||||||
result.addons[addon.slug] = ContentTrustResult.FAILED
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|||||||
@@ -25,8 +25,6 @@ from .coresys import CoreSys, CoreSysAttributes
|
|||||||
from .docker.stats import DockerStats
|
from .docker.stats import DockerStats
|
||||||
from .docker.supervisor import DockerSupervisor
|
from .docker.supervisor import DockerSupervisor
|
||||||
from .exceptions import (
|
from .exceptions import (
|
||||||
CodeNotaryError,
|
|
||||||
CodeNotaryUntrusted,
|
|
||||||
DockerError,
|
DockerError,
|
||||||
HostAppArmorError,
|
HostAppArmorError,
|
||||||
SupervisorAppArmorError,
|
SupervisorAppArmorError,
|
||||||
@@ -37,7 +35,6 @@ from .exceptions import (
|
|||||||
from .jobs.const import JobCondition, JobThrottle
|
from .jobs.const import JobCondition, JobThrottle
|
||||||
from .jobs.decorator import Job
|
from .jobs.decorator import Job
|
||||||
from .resolution.const import ContextType, IssueType, UnhealthyReason
|
from .resolution.const import ContextType, IssueType, UnhealthyReason
|
||||||
from .utils.codenotary import calc_checksum
|
|
||||||
from .utils.sentry import async_capture_exception
|
from .utils.sentry import async_capture_exception
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
||||||
@@ -150,20 +147,6 @@ class Supervisor(CoreSysAttributes):
|
|||||||
_LOGGER.error,
|
_LOGGER.error,
|
||||||
) from err
|
) from err
|
||||||
|
|
||||||
# Validate
|
|
||||||
try:
|
|
||||||
await self.sys_security.verify_own_content(calc_checksum(data))
|
|
||||||
except CodeNotaryUntrusted as err:
|
|
||||||
raise SupervisorAppArmorError(
|
|
||||||
"Content-Trust is broken for the AppArmor profile fetch!",
|
|
||||||
_LOGGER.critical,
|
|
||||||
) from err
|
|
||||||
except CodeNotaryError as err:
|
|
||||||
raise SupervisorAppArmorError(
|
|
||||||
f"CodeNotary error while processing AppArmor fetch: {err!s}",
|
|
||||||
_LOGGER.error,
|
|
||||||
) from err
|
|
||||||
|
|
||||||
# Load
|
# Load
|
||||||
temp_dir: TemporaryDirectory | None = None
|
temp_dir: TemporaryDirectory | None = None
|
||||||
|
|
||||||
@@ -273,13 +256,6 @@ class Supervisor(CoreSysAttributes):
|
|||||||
"""
|
"""
|
||||||
return self.instance.logs()
|
return self.instance.logs()
|
||||||
|
|
||||||
def check_trust(self) -> Awaitable[None]:
|
|
||||||
"""Calculate Supervisor docker content trust.
|
|
||||||
|
|
||||||
Return Coroutine.
|
|
||||||
"""
|
|
||||||
return self.instance.check_trust()
|
|
||||||
|
|
||||||
async def stats(self) -> DockerStats:
|
async def stats(self) -> DockerStats:
|
||||||
"""Return stats of Supervisor."""
|
"""Return stats of Supervisor."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -31,14 +31,8 @@ from .const import (
|
|||||||
UpdateChannel,
|
UpdateChannel,
|
||||||
)
|
)
|
||||||
from .coresys import CoreSys, CoreSysAttributes
|
from .coresys import CoreSys, CoreSysAttributes
|
||||||
from .exceptions import (
|
from .exceptions import UpdaterError, UpdaterJobError
|
||||||
CodeNotaryError,
|
|
||||||
CodeNotaryUntrusted,
|
|
||||||
UpdaterError,
|
|
||||||
UpdaterJobError,
|
|
||||||
)
|
|
||||||
from .jobs.decorator import Job, JobCondition
|
from .jobs.decorator import Job, JobCondition
|
||||||
from .utils.codenotary import calc_checksum
|
|
||||||
from .utils.common import FileConfiguration
|
from .utils.common import FileConfiguration
|
||||||
from .validate import SCHEMA_UPDATER_CONFIG
|
from .validate import SCHEMA_UPDATER_CONFIG
|
||||||
|
|
||||||
@@ -289,19 +283,6 @@ class Updater(FileConfiguration, CoreSysAttributes):
|
|||||||
self.sys_bus.remove_listener(self._connectivity_listener)
|
self.sys_bus.remove_listener(self._connectivity_listener)
|
||||||
self._connectivity_listener = None
|
self._connectivity_listener = None
|
||||||
|
|
||||||
# Validate
|
|
||||||
try:
|
|
||||||
await self.sys_security.verify_own_content(calc_checksum(data))
|
|
||||||
except CodeNotaryUntrusted as err:
|
|
||||||
raise UpdaterError(
|
|
||||||
"Content-Trust is broken for the version file fetch!", _LOGGER.critical
|
|
||||||
) from err
|
|
||||||
except CodeNotaryError as err:
|
|
||||||
raise UpdaterError(
|
|
||||||
f"CodeNotary error while processing version fetch: {err!s}",
|
|
||||||
_LOGGER.error,
|
|
||||||
) from err
|
|
||||||
|
|
||||||
# Parse data
|
# Parse data
|
||||||
try:
|
try:
|
||||||
data = json.loads(data)
|
data = json.loads(data)
|
||||||
|
|||||||
@@ -1,109 +0,0 @@
|
|||||||
"""Small wrapper for CodeNotary."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import hashlib
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
import shlex
|
|
||||||
from typing import Final
|
|
||||||
|
|
||||||
from dirhash import dirhash
|
|
||||||
|
|
||||||
from ..exceptions import CodeNotaryBackendError, CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from . import clean_env
|
|
||||||
|
|
||||||
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
_CAS_CMD: str = (
|
|
||||||
"cas authenticate --signerID {signer} --silent --output json --hash {sum}"
|
|
||||||
)
|
|
||||||
_CACHE: set[tuple[str, str]] = set()
|
|
||||||
|
|
||||||
|
|
||||||
_ATTR_ERROR: Final = "error"
|
|
||||||
_ATTR_STATUS: Final = "status"
|
|
||||||
_FALLBACK_ERROR: Final = "Unknown CodeNotary backend issue"
|
|
||||||
|
|
||||||
|
|
||||||
def calc_checksum(data: str | bytes) -> str:
|
|
||||||
"""Generate checksum for CodeNotary."""
|
|
||||||
if isinstance(data, str):
|
|
||||||
return hashlib.sha256(data.encode()).hexdigest()
|
|
||||||
return hashlib.sha256(data).hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
def calc_checksum_path_sourcecode(folder: Path) -> str:
|
|
||||||
"""Calculate checksum for a path source code.
|
|
||||||
|
|
||||||
Need catch OSError.
|
|
||||||
"""
|
|
||||||
return dirhash(folder.as_posix(), "sha256", match=["*.py"])
|
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=unreachable
|
|
||||||
async def cas_validate(
|
|
||||||
signer: str,
|
|
||||||
checksum: str,
|
|
||||||
) -> None:
|
|
||||||
"""Validate data against CodeNotary."""
|
|
||||||
return
|
|
||||||
if (checksum, signer) in _CACHE:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Generate command for request
|
|
||||||
command = shlex.split(_CAS_CMD.format(signer=signer, sum=checksum))
|
|
||||||
|
|
||||||
# Request notary authorization
|
|
||||||
_LOGGER.debug("Send cas command: %s", command)
|
|
||||||
try:
|
|
||||||
proc = await asyncio.create_subprocess_exec(
|
|
||||||
*command,
|
|
||||||
stdin=asyncio.subprocess.DEVNULL,
|
|
||||||
stdout=asyncio.subprocess.PIPE,
|
|
||||||
stderr=asyncio.subprocess.PIPE,
|
|
||||||
env=clean_env(),
|
|
||||||
)
|
|
||||||
|
|
||||||
async with asyncio.timeout(15):
|
|
||||||
data, error = await proc.communicate()
|
|
||||||
except TimeoutError:
|
|
||||||
raise CodeNotaryBackendError(
|
|
||||||
"Timeout while processing CodeNotary", _LOGGER.warning
|
|
||||||
) from None
|
|
||||||
except OSError as err:
|
|
||||||
raise CodeNotaryError(
|
|
||||||
f"CodeNotary fatal error: {err!s}", _LOGGER.critical
|
|
||||||
) from err
|
|
||||||
|
|
||||||
# Check if Notarized
|
|
||||||
if proc.returncode != 0 and not data:
|
|
||||||
if error:
|
|
||||||
try:
|
|
||||||
error = error.decode("utf-8")
|
|
||||||
except UnicodeDecodeError as err:
|
|
||||||
raise CodeNotaryBackendError(_FALLBACK_ERROR, _LOGGER.warning) from err
|
|
||||||
if "not notarized" in error:
|
|
||||||
raise CodeNotaryUntrusted()
|
|
||||||
else:
|
|
||||||
error = _FALLBACK_ERROR
|
|
||||||
raise CodeNotaryBackendError(error, _LOGGER.warning)
|
|
||||||
|
|
||||||
# Parse data
|
|
||||||
try:
|
|
||||||
data_json = json.loads(data)
|
|
||||||
_LOGGER.debug("CodeNotary response with: %s", data_json)
|
|
||||||
except (json.JSONDecodeError, UnicodeDecodeError) as err:
|
|
||||||
raise CodeNotaryError(
|
|
||||||
f"Can't parse CodeNotary output: {data!s} - {err!s}", _LOGGER.error
|
|
||||||
) from err
|
|
||||||
|
|
||||||
if _ATTR_ERROR in data_json:
|
|
||||||
raise CodeNotaryBackendError(data_json[_ATTR_ERROR], _LOGGER.warning)
|
|
||||||
|
|
||||||
if data_json[_ATTR_STATUS] == 0:
|
|
||||||
_CACHE.add((checksum, signer))
|
|
||||||
else:
|
|
||||||
raise CodeNotaryUntrusted()
|
|
||||||
@@ -12,7 +12,6 @@ from .const import (
|
|||||||
ATTR_AUTO_UPDATE,
|
ATTR_AUTO_UPDATE,
|
||||||
ATTR_CHANNEL,
|
ATTR_CHANNEL,
|
||||||
ATTR_CLI,
|
ATTR_CLI,
|
||||||
ATTR_CONTENT_TRUST,
|
|
||||||
ATTR_COUNTRY,
|
ATTR_COUNTRY,
|
||||||
ATTR_DEBUG,
|
ATTR_DEBUG,
|
||||||
ATTR_DEBUG_BLOCK,
|
ATTR_DEBUG_BLOCK,
|
||||||
@@ -229,7 +228,6 @@ SCHEMA_INGRESS_CONFIG = vol.Schema(
|
|||||||
# pylint: disable=no-value-for-parameter
|
# pylint: disable=no-value-for-parameter
|
||||||
SCHEMA_SECURITY_CONFIG = vol.Schema(
|
SCHEMA_SECURITY_CONFIG = vol.Schema(
|
||||||
{
|
{
|
||||||
vol.Optional(ATTR_CONTENT_TRUST, default=True): vol.Boolean(),
|
|
||||||
vol.Optional(ATTR_PWNED, default=True): vol.Boolean(),
|
vol.Optional(ATTR_PWNED, default=True): vol.Boolean(),
|
||||||
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
|
vol.Optional(ATTR_FORCE_SECURITY, default=False): vol.Boolean(),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -17,16 +17,6 @@ async def test_api_security_options_force_security(api_client, coresys: CoreSys)
|
|||||||
assert coresys.security.force
|
assert coresys.security.force
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_api_security_options_content_trust(api_client, coresys: CoreSys):
|
|
||||||
"""Test security options content trust."""
|
|
||||||
assert coresys.security.content_trust
|
|
||||||
|
|
||||||
await api_client.post("/security/options", json={"content_trust": False})
|
|
||||||
|
|
||||||
assert not coresys.security.content_trust
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_api_security_options_pwned(api_client, coresys: CoreSys):
|
async def test_api_security_options_pwned(api_client, coresys: CoreSys):
|
||||||
"""Test security options pwned."""
|
"""Test security options pwned."""
|
||||||
@@ -41,11 +31,8 @@ async def test_api_security_options_pwned(api_client, coresys: CoreSys):
|
|||||||
async def test_api_integrity_check(
|
async def test_api_integrity_check(
|
||||||
api_client, coresys: CoreSys, supervisor_internet: AsyncMock
|
api_client, coresys: CoreSys, supervisor_internet: AsyncMock
|
||||||
):
|
):
|
||||||
"""Test security integrity check."""
|
"""Test security integrity check - now deprecated."""
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
resp = await api_client.post("/security/integrity")
|
resp = await api_client.post("/security/integrity")
|
||||||
result = await resp.json()
|
|
||||||
|
|
||||||
assert result["data"]["core"] == "untested"
|
# CodeNotary integrity check has been removed, should return 410 Gone
|
||||||
assert result["data"]["supervisor"] == "untested"
|
assert resp.status == 410
|
||||||
|
|||||||
@@ -31,15 +31,6 @@ from supervisor.jobs import JobSchedulerOptions, SupervisorJob
|
|||||||
from tests.common import load_json_fixture
|
from tests.common import load_json_fixture
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def mock_verify_content(coresys: CoreSys):
|
|
||||||
"""Mock verify_content utility during tests."""
|
|
||||||
with patch.object(
|
|
||||||
coresys.security, "verify_content", return_value=None
|
|
||||||
) as verify_content:
|
|
||||||
yield verify_content
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"cpu_arch, platform",
|
"cpu_arch, platform",
|
||||||
[
|
[
|
||||||
|
|||||||
@@ -181,7 +181,6 @@ async def test_reload_updater_triggers_supervisor_update(
|
|||||||
"""Test an updater reload triggers a supervisor update if there is one."""
|
"""Test an updater reload triggers a supervisor update if there is one."""
|
||||||
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
coresys.hardware.disk.get_disk_free_space = lambda x: 5000
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
await coresys.core.set_state(CoreState.RUNNING)
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(
|
patch.object(
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ from supervisor.exceptions import (
|
|||||||
AudioJobError,
|
AudioJobError,
|
||||||
CliError,
|
CliError,
|
||||||
CliJobError,
|
CliJobError,
|
||||||
CodeNotaryUntrusted,
|
|
||||||
CoreDNSError,
|
CoreDNSError,
|
||||||
CoreDNSJobError,
|
CoreDNSJobError,
|
||||||
DockerError,
|
DockerError,
|
||||||
@@ -337,14 +336,12 @@ async def test_repair_failed(
|
|||||||
patch.object(
|
patch.object(
|
||||||
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
|
DockerInterface, "arch", new=PropertyMock(return_value=CpuArch.AMD64)
|
||||||
),
|
),
|
||||||
patch(
|
patch.object(DockerInterface, "install", side_effect=DockerError),
|
||||||
"supervisor.security.module.cas_validate", side_effect=CodeNotaryUntrusted
|
|
||||||
),
|
|
||||||
):
|
):
|
||||||
await plugin.repair()
|
await plugin.repair()
|
||||||
|
|
||||||
capture_exception.assert_called_once()
|
capture_exception.assert_called_once()
|
||||||
assert check_exception_chain(capture_exception.call_args[0][0], CodeNotaryUntrusted)
|
assert check_exception_chain(capture_exception.call_args[0][0], DockerError)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ async def test_if_check_make_issue(coresys: CoreSys):
|
|||||||
"""Test check for setup."""
|
"""Test check for setup."""
|
||||||
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
|
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
await coresys.core.set_state(CoreState.RUNNING)
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
|
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
|
||||||
await coresys.resolution.check.check_system()
|
await coresys.resolution.check.check_system()
|
||||||
@@ -63,7 +62,6 @@ async def test_if_check_cleanup_issue(coresys: CoreSys):
|
|||||||
"""Test check for setup."""
|
"""Test check for setup."""
|
||||||
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
|
free_space = Issue(IssueType.FREE_SPACE, ContextType.SYSTEM)
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
await coresys.core.set_state(CoreState.RUNNING)
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
|
with patch("shutil.disk_usage", return_value=(1, 1, 1)):
|
||||||
await coresys.resolution.check.check_system()
|
await coresys.resolution.check.check_system()
|
||||||
|
|||||||
@@ -1,96 +0,0 @@
|
|||||||
"""Test Check Supervisor trust."""
|
|
||||||
|
|
||||||
# pylint: disable=import-error,protected-access
|
|
||||||
from unittest.mock import AsyncMock, patch
|
|
||||||
|
|
||||||
from supervisor.const import CoreState
|
|
||||||
from supervisor.coresys import CoreSys
|
|
||||||
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from supervisor.resolution.checks.supervisor_trust import CheckSupervisorTrust
|
|
||||||
from supervisor.resolution.const import IssueType, UnhealthyReason
|
|
||||||
|
|
||||||
|
|
||||||
async def test_base(coresys: CoreSys):
|
|
||||||
"""Test check basics."""
|
|
||||||
supervisor_trust = CheckSupervisorTrust(coresys)
|
|
||||||
assert supervisor_trust.slug == "supervisor_trust"
|
|
||||||
assert supervisor_trust.enabled
|
|
||||||
|
|
||||||
|
|
||||||
async def test_check(coresys: CoreSys):
|
|
||||||
"""Test check."""
|
|
||||||
supervisor_trust = CheckSupervisorTrust(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
|
|
||||||
assert len(coresys.resolution.issues) == 0
|
|
||||||
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
|
|
||||||
await supervisor_trust.run_check()
|
|
||||||
assert coresys.supervisor.check_trust.called
|
|
||||||
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(return_value=None)
|
|
||||||
await supervisor_trust.run_check()
|
|
||||||
assert coresys.supervisor.check_trust.called
|
|
||||||
|
|
||||||
assert len(coresys.resolution.issues) == 0
|
|
||||||
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
await supervisor_trust.run_check()
|
|
||||||
assert coresys.supervisor.check_trust.called
|
|
||||||
|
|
||||||
assert len(coresys.resolution.issues) == 1
|
|
||||||
assert coresys.resolution.issues[-1].type == IssueType.TRUST
|
|
||||||
|
|
||||||
assert UnhealthyReason.UNTRUSTED in coresys.resolution.unhealthy
|
|
||||||
|
|
||||||
|
|
||||||
async def test_approve(coresys: CoreSys):
|
|
||||||
"""Test check."""
|
|
||||||
supervisor_trust = CheckSupervisorTrust(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
assert await supervisor_trust.approve_check()
|
|
||||||
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(return_value=None)
|
|
||||||
assert not await supervisor_trust.approve_check()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_with_global_disable(coresys: CoreSys, caplog):
|
|
||||||
"""Test when pwned is globally disabled."""
|
|
||||||
coresys.security.content_trust = False
|
|
||||||
supervisor_trust = CheckSupervisorTrust(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
|
|
||||||
assert len(coresys.resolution.issues) == 0
|
|
||||||
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
await supervisor_trust.run_check()
|
|
||||||
assert not coresys.security.verify_own_content.called
|
|
||||||
assert (
|
|
||||||
"Skipping supervisor_trust, content_trust is globally disabled" in caplog.text
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_did_run(coresys: CoreSys):
|
|
||||||
"""Test that the check ran as expected."""
|
|
||||||
supervisor_trust = CheckSupervisorTrust(coresys)
|
|
||||||
should_run = supervisor_trust.states
|
|
||||||
should_not_run = [state for state in CoreState if state not in should_run]
|
|
||||||
assert len(should_run) != 0
|
|
||||||
assert len(should_not_run) != 0
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.resolution.checks.supervisor_trust.CheckSupervisorTrust.run_check",
|
|
||||||
return_value=None,
|
|
||||||
) as check:
|
|
||||||
for state in should_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await supervisor_trust()
|
|
||||||
check.assert_called_once()
|
|
||||||
check.reset_mock()
|
|
||||||
|
|
||||||
for state in should_not_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await supervisor_trust()
|
|
||||||
check.assert_not_called()
|
|
||||||
check.reset_mock()
|
|
||||||
@@ -1,46 +0,0 @@
|
|||||||
"""Test evaluation base."""
|
|
||||||
|
|
||||||
# pylint: disable=import-error,protected-access
|
|
||||||
from unittest.mock import patch
|
|
||||||
|
|
||||||
from supervisor.const import CoreState
|
|
||||||
from supervisor.coresys import CoreSys
|
|
||||||
from supervisor.resolution.evaluations.content_trust import EvaluateContentTrust
|
|
||||||
|
|
||||||
|
|
||||||
async def test_evaluation(coresys: CoreSys):
|
|
||||||
"""Test evaluation."""
|
|
||||||
job_conditions = EvaluateContentTrust(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.SETUP)
|
|
||||||
|
|
||||||
await job_conditions()
|
|
||||||
assert job_conditions.reason not in coresys.resolution.unsupported
|
|
||||||
|
|
||||||
coresys.security.content_trust = False
|
|
||||||
await job_conditions()
|
|
||||||
assert job_conditions.reason in coresys.resolution.unsupported
|
|
||||||
|
|
||||||
|
|
||||||
async def test_did_run(coresys: CoreSys):
|
|
||||||
"""Test that the evaluation ran as expected."""
|
|
||||||
job_conditions = EvaluateContentTrust(coresys)
|
|
||||||
should_run = job_conditions.states
|
|
||||||
should_not_run = [state for state in CoreState if state not in should_run]
|
|
||||||
assert len(should_run) != 0
|
|
||||||
assert len(should_not_run) != 0
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.resolution.evaluations.content_trust.EvaluateContentTrust.evaluate",
|
|
||||||
return_value=None,
|
|
||||||
) as evaluate:
|
|
||||||
for state in should_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await job_conditions()
|
|
||||||
evaluate.assert_called_once()
|
|
||||||
evaluate.reset_mock()
|
|
||||||
|
|
||||||
for state in should_not_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await job_conditions()
|
|
||||||
evaluate.assert_not_called()
|
|
||||||
evaluate.reset_mock()
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
"""Test evaluation base."""
|
|
||||||
|
|
||||||
# pylint: disable=import-error,protected-access
|
|
||||||
import errno
|
|
||||||
import os
|
|
||||||
from pathlib import Path
|
|
||||||
from unittest.mock import AsyncMock, patch
|
|
||||||
|
|
||||||
from supervisor.const import CoreState
|
|
||||||
from supervisor.coresys import CoreSys
|
|
||||||
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from supervisor.resolution.const import ContextType, IssueType
|
|
||||||
from supervisor.resolution.data import Issue
|
|
||||||
from supervisor.resolution.evaluations.source_mods import EvaluateSourceMods
|
|
||||||
|
|
||||||
|
|
||||||
async def test_evaluation(coresys: CoreSys):
|
|
||||||
"""Test evaluation."""
|
|
||||||
with patch(
|
|
||||||
"supervisor.resolution.evaluations.source_mods._SUPERVISOR_SOURCE",
|
|
||||||
Path(f"{os.getcwd()}/supervisor"),
|
|
||||||
):
|
|
||||||
sourcemods = EvaluateSourceMods(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
await sourcemods()
|
|
||||||
assert sourcemods.reason in coresys.resolution.unsupported
|
|
||||||
|
|
||||||
coresys.security.verify_own_content = AsyncMock(side_effect=CodeNotaryError)
|
|
||||||
await sourcemods()
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
|
|
||||||
coresys.security.verify_own_content = AsyncMock()
|
|
||||||
await sourcemods()
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
|
|
||||||
|
|
||||||
async def test_did_run(coresys: CoreSys):
|
|
||||||
"""Test that the evaluation ran as expected."""
|
|
||||||
sourcemods = EvaluateSourceMods(coresys)
|
|
||||||
should_run = sourcemods.states
|
|
||||||
should_not_run = [state for state in CoreState if state not in should_run]
|
|
||||||
assert len(should_run) != 0
|
|
||||||
assert len(should_not_run) != 0
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.resolution.evaluations.source_mods.EvaluateSourceMods.evaluate",
|
|
||||||
return_value=None,
|
|
||||||
) as evaluate:
|
|
||||||
for state in should_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await sourcemods()
|
|
||||||
evaluate.assert_called_once()
|
|
||||||
evaluate.reset_mock()
|
|
||||||
|
|
||||||
for state in should_not_run:
|
|
||||||
await coresys.core.set_state(state)
|
|
||||||
await sourcemods()
|
|
||||||
evaluate.assert_not_called()
|
|
||||||
evaluate.reset_mock()
|
|
||||||
|
|
||||||
|
|
||||||
async def test_evaluation_error(coresys: CoreSys):
|
|
||||||
"""Test error reading file during evaluation."""
|
|
||||||
sourcemods = EvaluateSourceMods(coresys)
|
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
|
||||||
corrupt_fs = Issue(IssueType.CORRUPT_FILESYSTEM, ContextType.SYSTEM)
|
|
||||||
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
assert corrupt_fs not in coresys.resolution.issues
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.utils.codenotary.dirhash",
|
|
||||||
side_effect=(err := OSError()),
|
|
||||||
):
|
|
||||||
err.errno = errno.EBUSY
|
|
||||||
await sourcemods()
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
assert corrupt_fs in coresys.resolution.issues
|
|
||||||
assert coresys.core.healthy is True
|
|
||||||
|
|
||||||
coresys.resolution.dismiss_issue(corrupt_fs)
|
|
||||||
err.errno = errno.EBADMSG
|
|
||||||
await sourcemods()
|
|
||||||
assert sourcemods.reason not in coresys.resolution.unsupported
|
|
||||||
assert corrupt_fs in coresys.resolution.issues
|
|
||||||
assert coresys.core.healthy is False
|
|
||||||
@@ -1,69 +0,0 @@
|
|||||||
"""Test evaluation base."""
|
|
||||||
|
|
||||||
# pylint: disable=import-error,protected-access
|
|
||||||
from datetime import timedelta
|
|
||||||
from unittest.mock import AsyncMock
|
|
||||||
|
|
||||||
import time_machine
|
|
||||||
|
|
||||||
from supervisor.coresys import CoreSys
|
|
||||||
from supervisor.resolution.const import ContextType, IssueType, SuggestionType
|
|
||||||
from supervisor.resolution.data import Issue, Suggestion
|
|
||||||
from supervisor.resolution.fixups.system_execute_integrity import (
|
|
||||||
FixupSystemExecuteIntegrity,
|
|
||||||
)
|
|
||||||
from supervisor.security.const import ContentTrustResult, IntegrityResult
|
|
||||||
from supervisor.utils.dt import utcnow
|
|
||||||
|
|
||||||
|
|
||||||
async def test_fixup(coresys: CoreSys, supervisor_internet: AsyncMock):
|
|
||||||
"""Test fixup."""
|
|
||||||
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
|
|
||||||
|
|
||||||
assert system_execute_integrity.auto
|
|
||||||
|
|
||||||
coresys.resolution.add_suggestion(
|
|
||||||
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
|
|
||||||
)
|
|
||||||
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
|
|
||||||
|
|
||||||
coresys.security.integrity_check = AsyncMock(
|
|
||||||
return_value=IntegrityResult(
|
|
||||||
ContentTrustResult.PASS,
|
|
||||||
ContentTrustResult.PASS,
|
|
||||||
{"audio": ContentTrustResult.PASS},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
await system_execute_integrity()
|
|
||||||
|
|
||||||
assert coresys.security.integrity_check.called
|
|
||||||
assert len(coresys.resolution.suggestions) == 0
|
|
||||||
assert len(coresys.resolution.issues) == 0
|
|
||||||
|
|
||||||
|
|
||||||
async def test_fixup_error(coresys: CoreSys, supervisor_internet: AsyncMock):
|
|
||||||
"""Test fixup."""
|
|
||||||
system_execute_integrity = FixupSystemExecuteIntegrity(coresys)
|
|
||||||
|
|
||||||
assert system_execute_integrity.auto
|
|
||||||
|
|
||||||
coresys.resolution.add_suggestion(
|
|
||||||
Suggestion(SuggestionType.EXECUTE_INTEGRITY, ContextType.SYSTEM)
|
|
||||||
)
|
|
||||||
coresys.resolution.add_issue(Issue(IssueType.TRUST, ContextType.SYSTEM))
|
|
||||||
|
|
||||||
coresys.security.integrity_check = AsyncMock(
|
|
||||||
return_value=IntegrityResult(
|
|
||||||
ContentTrustResult.FAILED,
|
|
||||||
ContentTrustResult.PASS,
|
|
||||||
{"audio": ContentTrustResult.PASS},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
with time_machine.travel(utcnow() + timedelta(hours=24)):
|
|
||||||
await system_execute_integrity()
|
|
||||||
|
|
||||||
assert coresys.security.integrity_check.called
|
|
||||||
assert len(coresys.resolution.suggestions) == 1
|
|
||||||
assert len(coresys.resolution.issues) == 1
|
|
||||||
@@ -1,21 +1,15 @@
|
|||||||
"""Test evaluations."""
|
"""Test evaluations."""
|
||||||
|
|
||||||
from unittest.mock import Mock, patch
|
from unittest.mock import Mock
|
||||||
|
|
||||||
from supervisor.const import CoreState
|
from supervisor.const import CoreState
|
||||||
from supervisor.coresys import CoreSys
|
from supervisor.coresys import CoreSys
|
||||||
from supervisor.utils import check_exception_chain
|
|
||||||
|
|
||||||
|
|
||||||
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
|
async def test_evaluate_system_error(coresys: CoreSys, capture_exception: Mock):
|
||||||
"""Test error while evaluating system."""
|
"""Test error while evaluating system."""
|
||||||
await coresys.core.set_state(CoreState.RUNNING)
|
await coresys.core.set_state(CoreState.RUNNING)
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.resolution.evaluations.source_mods.calc_checksum_path_sourcecode",
|
|
||||||
side_effect=RuntimeError,
|
|
||||||
):
|
|
||||||
await coresys.resolution.evaluate.evaluate_system()
|
await coresys.resolution.evaluate.evaluate_system()
|
||||||
|
|
||||||
capture_exception.assert_called_once()
|
capture_exception.assert_not_called()
|
||||||
assert check_exception_chain(capture_exception.call_args[0][0], RuntimeError)
|
|
||||||
|
|||||||
@@ -1,127 +0,0 @@
|
|||||||
"""Testing handling with Security."""
|
|
||||||
|
|
||||||
from unittest.mock import AsyncMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from supervisor.coresys import CoreSys
|
|
||||||
from supervisor.exceptions import CodeNotaryError, CodeNotaryUntrusted
|
|
||||||
from supervisor.security.const import ContentTrustResult
|
|
||||||
|
|
||||||
|
|
||||||
async def test_content_trust(coresys: CoreSys):
|
|
||||||
"""Test Content-Trust."""
|
|
||||||
|
|
||||||
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
|
|
||||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
|
||||||
assert cas_validate.called
|
|
||||||
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.security.module.cas_validate", AsyncMock()
|
|
||||||
) as cas_validate:
|
|
||||||
await coresys.security.verify_own_content("ffffffffffffff")
|
|
||||||
assert cas_validate.called
|
|
||||||
cas_validate.assert_called_once_with(
|
|
||||||
"notary@home-assistant.io", "ffffffffffffff"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_disabled_content_trust(coresys: CoreSys):
|
|
||||||
"""Test Content-Trust."""
|
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
|
|
||||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
|
||||||
assert not cas_validate.called
|
|
||||||
|
|
||||||
with patch("supervisor.security.module.cas_validate", AsyncMock()) as cas_validate:
|
|
||||||
await coresys.security.verify_own_content("ffffffffffffff")
|
|
||||||
assert not cas_validate.called
|
|
||||||
|
|
||||||
|
|
||||||
async def test_force_content_trust(coresys: CoreSys):
|
|
||||||
"""Force Content-Trust tests."""
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.security.module.cas_validate",
|
|
||||||
AsyncMock(side_effect=CodeNotaryError),
|
|
||||||
) as cas_validate:
|
|
||||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
|
||||||
assert cas_validate.called
|
|
||||||
cas_validate.assert_called_once_with("test@mail.com", "ffffffffffffff")
|
|
||||||
|
|
||||||
coresys.security.force = True
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch(
|
|
||||||
"supervisor.security.module.cas_validate",
|
|
||||||
AsyncMock(side_effect=CodeNotaryError),
|
|
||||||
) as cas_validate,
|
|
||||||
pytest.raises(CodeNotaryError),
|
|
||||||
):
|
|
||||||
await coresys.security.verify_content("test@mail.com", "ffffffffffffff")
|
|
||||||
|
|
||||||
|
|
||||||
async def test_integrity_check_disabled(coresys: CoreSys):
|
|
||||||
"""Test integrity check with disabled content trust."""
|
|
||||||
coresys.security.content_trust = False
|
|
||||||
|
|
||||||
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
|
|
||||||
|
|
||||||
assert result.core == ContentTrustResult.UNTESTED
|
|
||||||
assert result.supervisor == ContentTrustResult.UNTESTED
|
|
||||||
|
|
||||||
|
|
||||||
async def test_integrity_check(coresys: CoreSys, install_addon_ssh):
|
|
||||||
"""Test integrity check with content trust."""
|
|
||||||
coresys.homeassistant.core.check_trust = AsyncMock()
|
|
||||||
coresys.supervisor.check_trust = AsyncMock()
|
|
||||||
install_addon_ssh.check_trust = AsyncMock()
|
|
||||||
install_addon_ssh.data["codenotary"] = "test@example.com"
|
|
||||||
|
|
||||||
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
|
|
||||||
|
|
||||||
assert result.core == ContentTrustResult.PASS
|
|
||||||
assert result.supervisor == ContentTrustResult.PASS
|
|
||||||
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.PASS
|
|
||||||
|
|
||||||
|
|
||||||
async def test_integrity_check_error(coresys: CoreSys, install_addon_ssh):
|
|
||||||
"""Test integrity check with content trust issues."""
|
|
||||||
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryUntrusted)
|
|
||||||
install_addon_ssh.data["codenotary"] = "test@example.com"
|
|
||||||
|
|
||||||
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
|
|
||||||
|
|
||||||
assert result.core == ContentTrustResult.ERROR
|
|
||||||
assert result.supervisor == ContentTrustResult.ERROR
|
|
||||||
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.ERROR
|
|
||||||
|
|
||||||
|
|
||||||
async def test_integrity_check_failed(coresys: CoreSys, install_addon_ssh):
|
|
||||||
"""Test integrity check with content trust failed."""
|
|
||||||
coresys.homeassistant.core.check_trust = AsyncMock(side_effect=CodeNotaryError)
|
|
||||||
coresys.supervisor.check_trust = AsyncMock(side_effect=CodeNotaryError)
|
|
||||||
install_addon_ssh.check_trust = AsyncMock(side_effect=CodeNotaryError)
|
|
||||||
install_addon_ssh.data["codenotary"] = "test@example.com"
|
|
||||||
|
|
||||||
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
|
|
||||||
|
|
||||||
assert result.core == ContentTrustResult.FAILED
|
|
||||||
assert result.supervisor == ContentTrustResult.FAILED
|
|
||||||
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.FAILED
|
|
||||||
|
|
||||||
|
|
||||||
async def test_integrity_check_addon(coresys: CoreSys, install_addon_ssh):
|
|
||||||
"""Test integrity check with content trust but no signed add-ons."""
|
|
||||||
coresys.homeassistant.core.check_trust = AsyncMock()
|
|
||||||
coresys.supervisor.check_trust = AsyncMock()
|
|
||||||
|
|
||||||
result = await coresys.security.integrity_check.__wrapped__(coresys.security)
|
|
||||||
|
|
||||||
assert result.core == ContentTrustResult.PASS
|
|
||||||
assert result.supervisor == ContentTrustResult.PASS
|
|
||||||
assert result.addons[install_addon_ssh.slug] == ContentTrustResult.UNTESTED
|
|
||||||
@@ -86,7 +86,6 @@ async def test_os_update_path(
|
|||||||
"""Test OS upgrade path across major versions."""
|
"""Test OS upgrade path across major versions."""
|
||||||
coresys.os._board = "rpi4" # pylint: disable=protected-access
|
coresys.os._board = "rpi4" # pylint: disable=protected-access
|
||||||
coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access
|
coresys.os._version = AwesomeVersion(version) # pylint: disable=protected-access
|
||||||
with patch.object(type(coresys.security), "verify_own_content"):
|
|
||||||
await coresys.updater.fetch_data()
|
await coresys.updater.fetch_data()
|
||||||
|
|
||||||
assert coresys.updater.version_hassos == AwesomeVersion(expected)
|
assert coresys.updater.version_hassos == AwesomeVersion(expected)
|
||||||
@@ -105,7 +104,6 @@ async def test_delayed_fetch_for_connectivity(
|
|||||||
load_binary_fixture("version_stable.json")
|
load_binary_fixture("version_stable.json")
|
||||||
)
|
)
|
||||||
coresys.websession.head = AsyncMock()
|
coresys.websession.head = AsyncMock()
|
||||||
coresys.security.verify_own_content = AsyncMock()
|
|
||||||
|
|
||||||
# Network connectivity change causes a series of async tasks to eventually do a version fetch
|
# Network connectivity change causes a series of async tasks to eventually do a version fetch
|
||||||
# Rather then use some kind of sleep loop, set up listener for start of fetch data job
|
# Rather then use some kind of sleep loop, set up listener for start of fetch data job
|
||||||
|
|||||||
@@ -1,128 +0,0 @@
|
|||||||
"""Test CodeNotary."""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from unittest.mock import AsyncMock, Mock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from supervisor.exceptions import (
|
|
||||||
CodeNotaryBackendError,
|
|
||||||
CodeNotaryError,
|
|
||||||
CodeNotaryUntrusted,
|
|
||||||
)
|
|
||||||
from supervisor.utils.codenotary import calc_checksum, cas_validate
|
|
||||||
|
|
||||||
pytest.skip("code notary has been disabled due to issues", allow_module_level=True)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SubprocessResponse:
|
|
||||||
"""Class for specifying subprocess exec response."""
|
|
||||||
|
|
||||||
returncode: int = 0
|
|
||||||
data: str = ""
|
|
||||||
error: str | None = None
|
|
||||||
exception: Exception | None = None
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="subprocess_exec")
|
|
||||||
def fixture_subprocess_exec(request):
|
|
||||||
"""Mock subprocess exec with specific return."""
|
|
||||||
response = request.param
|
|
||||||
if response.exception:
|
|
||||||
communicate_return = AsyncMock(side_effect=response.exception)
|
|
||||||
else:
|
|
||||||
communicate_return = AsyncMock(return_value=(response.data, response.error))
|
|
||||||
|
|
||||||
exec_return = Mock(returncode=response.returncode, communicate=communicate_return)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"supervisor.utils.codenotary.asyncio.create_subprocess_exec",
|
|
||||||
return_value=exec_return,
|
|
||||||
) as subprocess_exec:
|
|
||||||
yield subprocess_exec
|
|
||||||
|
|
||||||
|
|
||||||
def test_checksum_calc():
|
|
||||||
"""Calc Checkusm as test."""
|
|
||||||
assert calc_checksum("test") == calc_checksum(b"test")
|
|
||||||
assert (
|
|
||||||
calc_checksum("test")
|
|
||||||
== "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_valid_checksum():
|
|
||||||
"""Test a valid autorization."""
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"4434a33ff9c695e870bc5bbe04230ea3361ecf4c129eb06133dd1373975a43f0",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def test_invalid_checksum():
|
|
||||||
"""Test a invalid autorization."""
|
|
||||||
with pytest.raises(CodeNotaryUntrusted):
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"subprocess_exec",
|
|
||||||
[SubprocessResponse(returncode=1, error=b"x is not notarized")],
|
|
||||||
)
|
|
||||||
async def test_not_notarized_error(subprocess_exec):
|
|
||||||
"""Test received a not notarized error response from command."""
|
|
||||||
with pytest.raises(CodeNotaryUntrusted):
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"subprocess_exec",
|
|
||||||
[
|
|
||||||
SubprocessResponse(returncode=1, error=b"test"),
|
|
||||||
SubprocessResponse(returncode=0, data='{"error":"asn1: structure error"}'),
|
|
||||||
SubprocessResponse(returncode=1, error="test".encode("utf-16")),
|
|
||||||
],
|
|
||||||
indirect=True,
|
|
||||||
)
|
|
||||||
async def test_cas_backend_error(subprocess_exec):
|
|
||||||
"""Test backend error executing cas command."""
|
|
||||||
with pytest.raises(CodeNotaryBackendError):
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"subprocess_exec",
|
|
||||||
[SubprocessResponse(returncode=0, data='{"status":1}')],
|
|
||||||
indirect=True,
|
|
||||||
)
|
|
||||||
async def test_cas_notarized_untrusted(subprocess_exec):
|
|
||||||
"""Test cas found notarized but untrusted content."""
|
|
||||||
with pytest.raises(CodeNotaryUntrusted):
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"subprocess_exec", [SubprocessResponse(exception=OSError())], indirect=True
|
|
||||||
)
|
|
||||||
async def test_cas_exec_os_error(subprocess_exec):
|
|
||||||
"""Test os error attempting to execute cas command."""
|
|
||||||
with pytest.raises(CodeNotaryError):
|
|
||||||
await cas_validate(
|
|
||||||
"notary@home-assistant.io",
|
|
||||||
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
|
|
||||||
)
|
|
||||||
Reference in New Issue
Block a user